diff --git a/internal/pkg/kafkaclient/manager.go b/internal/pkg/kafkaclient/manager.go new file mode 100644 index 0000000..2341ad0 --- /dev/null +++ b/internal/pkg/kafkaclient/manager.go @@ -0,0 +1,113 @@ +package kafkaclient + +import ( + "fmt" + "strings" + "sync" + "time" + + "github.com/segmentio/kafka-go" +) + +type KafkaReadersMap struct { + KafkaReadersLock sync.RWMutex + KafkaReaders map[string]*kafka.Reader +} + +type KafkaWritersMap struct { + KafkaWritersLock sync.RWMutex + KafkaWriters map[string]*kafka.Writer +} + +type KafkaManager struct { + kafkaReadersMap KafkaReadersMap + kafkaWritersMap KafkaWritersMap +} + +func InitKafkaManager() *KafkaManager { + return &KafkaManager{ + kafkaReadersMap: KafkaReadersMap{ + KafkaReaders: make(map[string]*kafka.Reader), + }, + kafkaWritersMap: KafkaWritersMap{ + KafkaWriters: make(map[string]*kafka.Writer), + }, + } +} + +func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) { + kafkaWriter := &kafka.Writer{ + Addr: kafka.TCP(kafkaUrl), + Topic: topic, + Balancer: &kafka.LeastBytes{}, + Async: false, + RequiredAcks: kafka.RequireAll, + BatchSize: 100, + BatchTimeout: 10 * time.Millisecond, + } + + m.kafkaWritersMap.KafkaWritersLock.Lock() + m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter + m.kafkaWritersMap.KafkaWritersLock.Unlock() +} + +func (m *KafkaManager) CleanKafkaWriters() { + fmt.Println("shutdown of kafka readers starts") + m.kafkaWritersMap.KafkaWritersLock.Lock() + for _, r := range m.kafkaWritersMap.KafkaWriters { + if err := r.Close(); err != nil { + fmt.Printf("Error in closing kafka writer %v", err) + } + } + m.kafkaWritersMap.KafkaWritersLock.Unlock() + fmt.Println("Kafka writers graceful shutdown complete") +} + +func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) { + brokers := strings.Split(kafkaUrl, ",") + kafkaReader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: brokers, + GroupID: groupID, + Topic: topic, + MinBytes: 1, + MaxBytes: 10e6, + }) + + m.kafkaReadersMap.KafkaReadersLock.Lock() + m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader + m.kafkaReadersMap.KafkaReadersLock.Unlock() +} + +func (m *KafkaManager) CleanKafkaReaders() { + m.kafkaReadersMap.KafkaReadersLock.Lock() + for _, r := range m.kafkaReadersMap.KafkaReaders { + if err := r.Close(); err != nil { + fmt.Printf("Error in closing kafka reader %v", err) + } + } + m.kafkaReadersMap.KafkaReadersLock.Unlock() + fmt.Println("Kafka readers graceful shutdown complete") +} + +func (m *KafkaManager) PopulateKafkaManager(url, name string, topics []string) { + for _, topic := range topics { + if name != "" { + gid := fmt.Sprintf("%s-%s", topic, name) + m.AddKafkaReader(url, topic, gid) + } else { + m.AddKafkaWriter(url, topic) + } + } +} + +func (m *KafkaManager) GetReader(topic string) *kafka.Reader { + m.kafkaReadersMap.KafkaReadersLock.Lock() + defer m.kafkaReadersMap.KafkaReadersLock.Unlock() + return m.kafkaReadersMap.KafkaReaders[topic] +} + +func (m *KafkaManager) GetWriter(topic string) *kafka.Writer { + m.kafkaWritersMap.KafkaWritersLock.Lock() + defer m.kafkaWritersMap.KafkaWritersLock.Unlock() + return m.kafkaWritersMap.KafkaWriters[topic] +} diff --git a/tests/decoder/decode_test.go b/tests/decoder/decode_test.go new file mode 100644 index 0000000..0bbc9e2 --- /dev/null +++ b/tests/decoder/decode_test.go @@ -0,0 +1,364 @@ +package decoder + +import ( + "bytes" + "testing" + + "github.com/AFASystems/presence/internal/pkg/common/appcontext" + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/segmentio/kafka-go" +) + +func TestDecodeBeacon_EmptyData(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + adv := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "", // Empty data + } + + // Execute + err := decodeBeacon(adv, appState, mockWriter, parserRegistry) + + // Assert + if err != nil { + t.Errorf("Expected no error for empty data, got %v", err) + } + + if len(mockWriter.Messages) != 0 { + t.Errorf("Expected no messages for empty data, got %d", len(mockWriter.Messages)) + } +} + +func TestDecodeBeacon_WhitespaceOnly(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + adv := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: " ", // Whitespace only + } + + // Execute + err := decodeBeacon(adv, appState, mockWriter, parserRegistry) + + // Assert + if err != nil { + t.Errorf("Expected no error for whitespace-only data, got %v", err) + } + + if len(mockWriter.Messages) != 0 { + t.Errorf("Expected no messages for whitespace-only data, got %d", len(mockWriter.Messages)) + } +} + +func TestDecodeBeacon_InvalidHex(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + adv := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "INVALID_HEX_DATA!!!", + } + + // Execute + err := decodeBeacon(adv, appState, mockWriter, parserRegistry) + + // Assert + if err == nil { + t.Error("Expected error for invalid hex data, got nil") + } + + if len(mockWriter.Messages) != 0 { + t.Errorf("Expected no messages for invalid hex, got %d", len(mockWriter.Messages)) + } +} + +func TestDecodeBeacon_ValidHexNoParser(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} // No parsers registered + + // Valid hex but no matching parser + adv := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "0201060302A0", // Valid AD structure + } + + // Execute + err := decodeBeacon(adv, appState, mockWriter, parserRegistry) + + // Assert + if err != nil { + t.Errorf("Expected no error when no parser matches, got %v", err) + } + + if len(mockWriter.Messages) != 0 { + t.Errorf("Expected no messages when no parser matches, got %d", len(mockWriter.Messages)) + } +} + +func TestDecodeBeacon_Deduplication(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + // Register a test parser + config := model.Config{ + Name: "test-parser", + Prefix: "02", + Length: 2, + } + parserRegistry.Register("test-parser", config) + + // Create an event that will be parsed + adv := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "020106", // Simple AD structure + } + + // First processing - should publish + err := decodeBeacon(adv, appState, mockWriter, parserRegistry) + if err != nil { + t.Fatalf("First processing failed: %v", err) + } + + firstMessageCount := len(mockWriter.Messages) + + // Second processing with identical data - should deduplicate + err = decodeBeacon(adv, appState, mockWriter, parserRegistry) + if err != nil { + t.Fatalf("Second processing failed: %v", err) + } + + // Assert - message count should not have changed + if len(mockWriter.Messages) != firstMessageCount { + t.Errorf("Expected deduplication, got %d messages (should be %d)", len(mockWriter.Messages), firstMessageCount) + } +} + +func TestDecodeBeacon_DifferentDataPublishes(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + // Register a test parser + config := model.Config{ + Name: "test-parser", + Prefix: "02", + Length: 2, + } + parserRegistry.Register("test-parser", config) + + // First processing + adv1 := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "020106", + } + + err := decodeBeacon(adv1, appState, mockWriter, parserRegistry) + if err != nil { + t.Fatalf("First processing failed: %v", err) + } + + firstMessageCount := len(mockWriter.Messages) + + // Second processing with different data - should publish again + adv2 := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "020107", // Different data + } + + err = decodeBeacon(adv2, appState, mockWriter, parserRegistry) + if err != nil { + t.Fatalf("Second processing failed: %v", err) + } + + // Assert - message count should have increased + if len(mockWriter.Messages) != firstMessageCount+1 { + t.Errorf("Expected new message for different data, got %d messages (expected %d)", len(mockWriter.Messages), firstMessageCount+1) + } +} + +func TestDecodeBeacon_WithFlagBytes(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + // Register a test parser + config := model.Config{ + Name: "test-parser", + Prefix: "02", + Length: 2, + } + parserRegistry.Register("test-parser", config) + + // Data with flag bytes (0x01 at position 1) + adv := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "0201060302A0", // Will have flags removed + } + + // Execute + err := decodeBeacon(adv, appState, mockWriter, parserRegistry) + + // Assert - should process successfully after flag removal + if err != nil { + t.Errorf("Expected no error with flag bytes, got %v", err) + } +} + +func TestDecodeBeacon_MultipleBeacons(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + // Register a test parser + config := model.Config{ + Name: "test-parser", + Prefix: "02", + Length: 2, + } + parserRegistry.Register("test-parser", config) + + // Process multiple different beacons + beacons := []model.BeaconAdvertisement{ + {ID: "beacon-1", Data: "020106"}, + {ID: "beacon-2", Data: "020107"}, + {ID: "beacon-3", Data: "020108"}, + } + + for _, adv := range beacons { + err := decodeBeacon(adv, appState, mockWriter, parserRegistry) + if err != nil { + t.Errorf("Failed to process beacon %s: %v", adv.ID, err) + } + } + + // Each unique beacon should produce a message + if len(mockWriter.Messages) != len(beacons) { + t.Errorf("Expected %d messages, got %d", len(beacons), len(mockWriter.Messages)) + } +} + +func TestProcessIncoming_ErrorHandling(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + // Invalid data that will cause an error + adv := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "INVALID_HEX", + } + + // Execute - should not panic, just handle error + processIncoming(adv, appState, mockWriter, parserRegistry) + + // Assert - no messages should be written + if len(mockWriter.Messages) != 0 { + t.Errorf("Expected no messages on error, got %d", len(mockWriter.Messages)) + } +} + +func TestDecodeBeacon_EventHashing(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + // Register a test parser that creates consistent events + config := model.Config{ + Name: "test-parser", + Prefix: "02", + Length: 2, + } + parserRegistry.Register("test-parser", config) + + adv := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "020106", + } + + // First processing + err := decodeBeacon(adv, appState, mockWriter, parserRegistry) + if err != nil { + t.Fatalf("First processing failed: %v", err) + } + + // Get the event from appState + event, exists := appState.GetBeaconEvent("test-beacon") + if !exists { + t.Fatal("Event should exist in appState") + } + + // Verify hash is created + hash := event.Hash() + if hash == nil || len(hash) == 0 { + t.Error("Expected non-empty hash") + } + + // Second processing should be deduplicated based on hash + err = decodeBeacon(adv, appState, mockWriter, parserRegistry) + if err != nil { + t.Fatalf("Second processing failed: %v", err) + } + + // Should still have only one message + if len(mockWriter.Messages) != 1 { + t.Errorf("Expected 1 message after deduplication, got %d", len(mockWriter.Messages)) + } +} + +func TestDecodeBeacon_VariousHexFormats(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + testCases := []struct { + name string + hexData string + shouldError bool + }{ + {"lowercase hex", "020106aa", false}, + {"uppercase hex", "020106AA", false}, + {"mixed case", "020106AaFf", false}, + {"with spaces", " 020106 ", false}, + {"odd length", "02016", true}, + {"invalid chars", "020106ZZ", true}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + adv := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: tc.hexData, + } + + err := decodeBeacon(adv, appState, mockWriter, parserRegistry) + + if tc.shouldError && err == nil { + t.Errorf("Expected error for %s, got nil", tc.name) + } + + if !tc.shouldError && err != nil && !bytes.Contains(err.Error(), []byte("no parser")) { + // Error is OK if it's "no parser", but not for hex decoding + t.Logf("Got expected error for %s: %v", tc.name, err) + } + }) + } +} diff --git a/tests/decoder/event_loop_test.go b/tests/decoder/event_loop_test.go new file mode 100644 index 0000000..b622724 --- /dev/null +++ b/tests/decoder/event_loop_test.go @@ -0,0 +1,369 @@ +package decoder + +import ( + "context" + "testing" + "time" + + "github.com/AFASystems/presence/internal/pkg/common/appcontext" + "github.com/AFASystems/presence/internal/pkg/model" +) + +func TestEventLoop_RawMessageProcessing(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + chRaw := make(chan model.BeaconAdvertisement, 10) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Create a test message + msg := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "020106", + } + + // Simulate event loop processing + go func() { + for { + select { + case <-ctx.Done(): + return + case m := <-chRaw: + processIncoming(m, appState, mockWriter, parserRegistry) + } + } + }() + + // Send message + chRaw <- msg + + // Give it time to process + time.Sleep(100 * time.Millisecond) + + // Cancel context + cancel() + + // Verify message was processed (even if no parser matched, processIncoming was called) + // We just verify no panic occurred +} + +func TestEventLoop_ParserRegistryUpdates(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + parserRegistry := &model.ParserRegistry{} + + chParser := make(chan model.KafkaParser, 10) + + // Test ADD operation + addMsg := model.KafkaParser{ + ID: "add", + Name: "new-parser", + Config: model.Config{ + Name: "new-parser", + Prefix: "02", + Length: 2, + }, + } + + chParser <- addMsg + + // Simulate event loop handling + select { + case msg := <-chParser: + switch msg.ID { + case "add": + config := msg.Config + parserRegistry.Register(config.Name, config) + case "delete": + parserRegistry.Unregister(msg.Name) + case "update": + config := msg.Config + parserRegistry.Register(config.Name, config) + } + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for parser message") + } + + // Verify parser was added + if len(parserRegistry.ParserList) != 1 { + t.Errorf("Expected 1 parser after add, got %d", len(parserRegistry.ParserList)) + } + + // Test DELETE operation + deleteMsg := model.KafkaParser{ + ID: "delete", + Name: "new-parser", + } + + chParser <- deleteMsg + + select { + case msg := <-chParser: + switch msg.ID { + case "add": + config := msg.Config + parserRegistry.Register(config.Name, config) + case "delete": + parserRegistry.Unregister(msg.Name) + case "update": + config := msg.Config + parserRegistry.Register(config.Name, config) + } + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for parser message") + } + + // Verify parser was deleted + if len(parserRegistry.ParserList) != 0 { + t.Errorf("Expected 0 parsers after delete, got %d", len(parserRegistry.ParserList)) + } +} + +func TestEventLoop_UpdateParser(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + parserRegistry := &model.ParserRegistry{} + + // Add initial parser + parserRegistry.Register("test-parser", model.Config{ + Name: "test-parser", + Prefix: "02", + Length: 2, + }) + + chParser := make(chan model.KafkaParser, 10) + + // Test UPDATE operation + updateMsg := model.KafkaParser{ + ID: "update", + Name: "test-parser", + Config: model.Config{ + Name: "test-parser", + Prefix: "03", + Length: 3, + }, + } + + chParser <- updateMsg + + // Simulate event loop handling + select { + case msg := <-chParser: + switch msg.ID { + case "add": + config := msg.Config + parserRegistry.Register(config.Name, config) + case "delete": + parserRegistry.Unregister(msg.Name) + case "update": + config := msg.Config + parserRegistry.Register(config.Name, config) + } + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for parser message") + } + + // Verify parser still exists (was updated, not deleted) + if len(parserRegistry.ParserList) != 1 { + t.Errorf("Expected 1 parser after update, got %d", len(parserRegistry.ParserList)) + } + + if _, exists := parserRegistry.ParserList["test-parser"]; !exists { + t.Error("Parser should still exist after update") + } +} + +func TestEventLoop_MultipleParserOperations(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + parserRegistry := &model.ParserRegistry{} + + chParser := make(chan model.KafkaParser, 10) + + // Send multiple operations + operations := []model.KafkaParser{ + {ID: "add", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "02", Length: 2}}, + {ID: "add", Name: "parser-2", Config: model.Config{Name: "parser-2", Prefix: "03", Length: 3}}, + {ID: "add", Name: "parser-3", Config: model.Config{Name: "parser-3", Prefix: "04", Length: 4}}, + {ID: "delete", Name: "parser-2"}, + {ID: "update", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "05", Length: 5}}, + } + + for _, op := range operations { + chParser <- op + } + + // Process all operations + for i := 0; i < len(operations); i++ { + select { + case msg := <-chParser: + switch msg.ID { + case "add": + config := msg.Config + parserRegistry.Register(config.Name, config) + case "delete": + parserRegistry.Unregister(msg.Name) + case "update": + config := msg.Config + parserRegistry.Register(config.Name, config) + } + case <-time.After(1 * time.Second): + t.Fatalf("Timeout processing operation %d", i) + } + } + + // Verify final state + if len(parserRegistry.ParserList) != 2 { + t.Errorf("Expected 2 parsers after all operations, got %d", len(parserRegistry.ParserList)) + } + + // parser-1 should exist (updated) + if _, exists := parserRegistry.ParserList["parser-1"]; !exists { + t.Error("parser-1 should exist") + } + + // parser-2 should not exist (deleted) + if _, exists := parserRegistry.ParserList["parser-2"]; exists { + t.Error("parser-2 should not exist") + } + + // parser-3 should exist (added) + if _, exists := parserRegistry.ParserList["parser-3"]; !exists { + t.Error("parser-3 should exist") + } +} + +func TestEventLoop_ContextCancellation(t *testing.T) { + // Setup + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + chRaw := make(chan model.BeaconAdvertisement, 10) + chParser := make(chan model.KafkaParser, 10) + + // Cancel immediately + cancel() + + // Verify context is cancelled + select { + case <-ctx.Done(): + // Expected - context was cancelled + return + case msg := <-chRaw: + t.Errorf("Should not receive raw messages after context cancellation, got: %+v", msg) + case msg := <-chParser: + t.Errorf("Should not receive parser messages after context cancellation, got: %+v", msg) + case <-time.After(1 * time.Second): + t.Error("Timeout - context cancellation should have been immediate") + } +} + +func TestEventLoop_ChannelBuffering(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + // Create buffered channels (like in main) + chRaw := make(chan model.BeaconAdvertisement, 2000) + chParser := make(chan model.KafkaParser, 200) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Send multiple messages without blocking + for i := 0; i < 100; i++ { + msg := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "020106", + } + chRaw <- msg + } + + // Verify all messages are buffered + if len(chRaw) != 100 { + t.Errorf("Expected 100 messages in buffer, got %d", len(chRaw)) + } + + // Send parser updates + for i := 0; i < 10; i++ { + msg := model.KafkaParser{ + ID: "add", + Name: "parser-" + string(rune('A'+i)), + Config: model.Config{ + Name: "parser-" + string(rune('A'+i)), + Prefix: "02", + Length: 2, + }, + } + chParser <- msg + } + + // Verify all parser messages are buffered + if len(chParser) != 10 { + t.Errorf("Expected 10 parser messages in buffer, got %d", len(chParser)) + } + + // Cancel context + cancel() +} + +func TestEventLoop_ParserAndRawChannels(t *testing.T) { + // Setup + appState := appcontext.NewAppState() + mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} + parserRegistry := &model.ParserRegistry{} + + chRaw := make(chan model.BeaconAdvertisement, 10) + chParser := make(chan model.KafkaParser, 10) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Send both raw and parser messages + rawMsg := model.BeaconAdvertisement{ + ID: "test-beacon", + Data: "020106", + } + + parserMsg := model.KafkaParser{ + ID: "add", + Name: "test-parser", + Config: model.Config{ + Name: "test-parser", + Prefix: "02", + Length: 2, + }, + } + + chRaw <- rawMsg + chParser <- parserMsg + + // Process both messages + processedRaw := false + processedParser := false + + for i := 0; i < 2; i++ { + select { + case <-chRaw: + processedRaw = true + case <-chParser: + processedParser = true + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for messages") + } + } + + if !processedRaw { + t.Error("Raw message should have been processed") + } + + if !processedParser { + t.Error("Parser message should have been processed") + } + + cancel() +} diff --git a/tests/decoder/integration_test.go b/tests/decoder/integration_test.go new file mode 100644 index 0000000..7790448 --- /dev/null +++ b/tests/decoder/integration_test.go @@ -0,0 +1,418 @@ +package decoder + +import ( + "context" + "encoding/json" + "os" + "testing" + "time" + + "github.com/AFASystems/presence/internal/pkg/common/appcontext" + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/segmentio/kafka-go" +) + +// TestIntegration_DecoderEndToEnd tests the complete decoder flow +func TestIntegration_DecoderEndToEnd(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + // Check if Kafka is available + kafkaURL := os.Getenv("KAFKA_URL") + if kafkaURL == "" { + kafkaURL = "localhost:9092" + } + + // Create test topics + rawTopic := "test-rawbeacons-" + time.Now().Format("20060102150405") + alertTopic := "test-alertbeacons-" + time.Now().Format("20060102150405") + + // Setup + appState := appcontext.NewAppState() + parserRegistry := &model.ParserRegistry{} + + // Register a test parser + config := model.Config{ + Name: "integration-test-parser", + Prefix: "02", + Length: 2, + MinLength: 2, + MaxLength: 20, + } + parserRegistry.Register("integration-test-parser", config) + + // Create Kafka writer + writer := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{kafkaURL}, + Topic: alertTopic, + }) + defer writer.Close() + + // Create Kafka reader to verify messages + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{kafkaURL}, + Topic: alertTopic, + GroupID: "test-group-" + time.Now().Format("20060102150405"), + }) + defer reader.Close() + + // Create a test beacon advertisement + adv := model.BeaconAdvertisement{ + ID: "integration-test-beacon", + Data: "020106", // Valid hex data + } + + // Process the beacon + err := decodeBeacon(adv, appState, writer, parserRegistry) + if err != nil { + t.Logf("Decode beacon returned error (may be expected if no parser matches): %v", err) + } + + // Give Kafka time to propagate + time.Sleep(1 * time.Second) + + // Verify event was stored in appState + event, exists := appState.GetBeaconEvent("integration-test-beacon") + if exists { + t.Logf("Event stored in appState: %+v", event) + } +} + +// TestIntegration_ParserRegistryOperations tests parser registry with real Kafka +func TestIntegration_ParserRegistryOperations(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + kafkaURL := os.Getenv("KAFKA_URL") + if kafkaURL == "" { + kafkaURL = "localhost:9092" + } + + alertTopic := "test-alertbeacons-registry-" + time.Now().Format("20060102150405") + + // Setup + appState := appcontext.NewAppState() + parserRegistry := &model.ParserRegistry{} + + writer := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{kafkaURL}, + Topic: alertTopic, + }) + defer writer.Close() + + // Test parser registration through Kafka message flow + parserMsg := model.KafkaParser{ + ID: "add", + Name: "kafka-test-parser", + Config: model.Config{ + Name: "kafka-test-parser", + Prefix: "02", + Length: 2, + MinLength: 2, + MaxLength: 20, + }, + } + + // Simulate parser registry update + switch parserMsg.ID { + case "add": + config := parserMsg.Config + parserRegistry.Register(config.Name, config) + case "delete": + parserRegistry.Unregister(parserMsg.Name) + case "update": + config := parserMsg.Config + parserRegistry.Register(config.Name, config) + } + + // Verify parser was registered + if len(parserRegistry.ParserList) != 1 { + t.Errorf("Expected 1 parser in registry, got %d", len(parserRegistry.ParserList)) + } + + if _, exists := parserRegistry.ParserList["kafka-test-parser"]; !exists { + t.Error("Parser should exist in registry") + } +} + +// TestIntegration_MultipleBeaconsSequential tests processing multiple beacons +func TestIntegration_MultipleBeaconsSequential(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + kafkaURL := os.Getenv("KAFKA_URL") + if kafkaURL == "" { + kafkaURL = "localhost:9092" + } + + alertTopic := "test-alertbeacons-multi-" + time.Now().Format("20060102150405") + + // Setup + appState := appcontext.NewAppState() + parserRegistry := &model.ParserRegistry{} + + // Register parser + config := model.Config{ + Name: "multi-test-parser", + Prefix: "02", + Length: 2, + MinLength: 2, + MaxLength: 20, + } + parserRegistry.Register("multi-test-parser", config) + + writer := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{kafkaURL}, + Topic: alertTopic, + }) + defer writer.Close() + + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{kafkaURL}, + Topic: alertTopic, + GroupID: "test-group-multi-" + time.Now().Format("20060102150405"), + MinBytes: 10e3, + MaxBytes: 10e6, + }) + defer reader.Close() + + // Process multiple beacons + beacons := []model.BeaconAdvertisement{ + {ID: "beacon-1", Data: "020106"}, + {ID: "beacon-2", Data: "020107"}, + {ID: "beacon-3", Data: "020108"}, + } + + for _, adv := range beacons { + err := decodeBeacon(adv, appState, writer, parserRegistry) + if err != nil { + t.Logf("Processing beacon %s returned error: %v", adv.ID, err) + } + } + + // Give Kafka time to propagate + time.Sleep(2 * time.Second) + + // Verify events in appState + for _, adv := range beacons { + event, exists := appState.GetBeaconEvent(adv.ID) + if exists { + t.Logf("Event for %s: %+v", adv.ID, event) + } + } +} + +// TestIntegration_EventDeduplication tests that duplicate events are not published +func TestIntegration_EventDeduplication(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + kafkaURL := os.Getenv("KAFKA_URL") + if kafkaURL == "" { + kafkaURL = "localhost:9092" + } + + alertTopic := "test-alertbeacons-dedup-" + time.Now().Format("20060102150405") + + // Setup + appState := appcontext.NewAppState() + parserRegistry := &model.ParserRegistry{} + + // Register parser + config := model.Config{ + Name: "dedup-test-parser", + Prefix: "02", + Length: 2, + MinLength: 2, + MaxLength: 20, + } + parserRegistry.Register("dedup-test-parser", config) + + writer := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{kafkaURL}, + Topic: alertTopic, + }) + defer writer.Close() + + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{kafkaURL}, + Topic: alertTopic, + GroupID: "test-group-dedup-" + time.Now().Format("20060102150405"), + }) + defer reader.Close() + + // Create identical beacon advertisement + adv := model.BeaconAdvertisement{ + ID: "dedup-test-beacon", + Data: "020106", + } + + // Process first time + err := decodeBeacon(adv, appState, writer, parserRegistry) + if err != nil { + t.Logf("First processing returned error: %v", err) + } + + // Process second time with identical data + err = decodeBeacon(adv, appState, writer, parserRegistry) + if err != nil { + t.Logf("Second processing returned error: %v", err) + } + + // Give Kafka time to propagate + time.Sleep(1 * time.Second) + + // Try to read from Kafka - should have at most 1 message due to deduplication + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + messageCount := 0 + for { + msg, err := reader.ReadMessage(ctx) + if err != nil { + break + } + + messageCount++ + t.Logf("Read message %d: %s", messageCount, string(msg.Value)) + + if messageCount > 1 { + t.Error("Expected at most 1 message due to deduplication, got more") + break + } + } + + t.Logf("Total messages read: %d", messageCount) +} + +// TestIntegration_AppStatePersistence tests that events persist in AppState +func TestIntegration_AppStatePersistence(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + kafkaURL := os.Getenv("KAFKA_URL") + if kafkaURL == "" { + kafkaURL = "localhost:9092" + } + + alertTopic := "test-alertbeacons-persist-" + time.Now().Format("20060102150405") + + // Setup + appState := appcontext.NewAppState() + parserRegistry := &model.ParserRegistry{} + + config := model.Config{ + Name: "persist-test-parser", + Prefix: "02", + Length: 2, + MinLength: 2, + MaxLength: 20, + } + parserRegistry.Register("persist-test-parser", config) + + writer := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{kafkaURL}, + Topic: alertTopic, + }) + defer writer.Close() + + // Process beacon + adv := model.BeaconAdvertisement{ + ID: "persist-test-beacon", + Data: "020106", + } + + err := decodeBeacon(adv, appState, writer, parserRegistry) + if err != nil { + t.Logf("Processing returned error: %v", err) + } + + // Verify event persists in AppState + event, exists := appState.GetBeaconEvent("persist-test-beacon") + if !exists { + t.Error("Event should exist in AppState after processing") + } else { + t.Logf("Event persisted: ID=%s, Type=%s, Battery=%d", + event.ID, event.Type, event.Battery) + + // Verify event can be serialized to JSON + jsonData, err := event.ToJSON() + if err != nil { + t.Errorf("Failed to serialize event to JSON: %v", err) + } else { + t.Logf("Event JSON: %s", string(jsonData)) + } + } +} + +// TestIntegration_ParserUpdateFlow tests updating parsers during runtime +func TestIntegration_ParserUpdateFlow(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + kafkaURL := os.Getenv("KAFKA_URL") + if kafkaURL == "" { + kafkaURL = "localhost:9092" + } + + alertTopic := "test-alertbeacons-update-" + time.Now().Format("20060102150405") + + // Setup + appState := appcontext.NewAppState() + parserRegistry := &model.ParserRegistry{} + + writer := kafka.NewWriter(kafka.WriterConfig{ + Brokers: []string{kafkaURL}, + Topic: alertTopic, + }) + defer writer.Close() + + // Initial parser config + config1 := model.Config{ + Name: "update-test-parser", + Prefix: "02", + Length: 2, + MinLength: 2, + MaxLength: 20, + } + parserRegistry.Register("update-test-parser", config1) + + // Process with initial config + adv := model.BeaconAdvertisement{ + ID: "update-test-beacon", + Data: "020106", + } + + err := decodeBeacon(adv, appState, writer, parserRegistry) + t.Logf("First processing: %v", err) + + // Update parser config + config2 := model.Config{ + Name: "update-test-parser", + Prefix: "03", + Length: 3, + MinLength: 3, + MaxLength: 25, + } + parserRegistry.Register("update-test-parser", config2) + + // Process again with updated config + adv2 := model.BeaconAdvertisement{ + ID: "update-test-beacon-2", + Data: "030107", + } + + err = decodeBeacon(adv2, appState, writer, parserRegistry) + t.Logf("Second processing with updated parser: %v", err) + + // Verify parser still exists + if _, exists := parserRegistry.ParserList["update-test-parser"]; !exists { + t.Error("Parser should exist after update") + } +} diff --git a/tests/decoder/parser_registry_test.go b/tests/decoder/parser_registry_test.go new file mode 100644 index 0000000..9aca00b --- /dev/null +++ b/tests/decoder/parser_registry_test.go @@ -0,0 +1,275 @@ +package decoder + +import ( + "testing" + + "github.com/AFASystems/presence/internal/pkg/model" +) + +func TestParserRegistry_AddParser(t *testing.T) { + // Setup + registry := &model.ParserRegistry{} + + // Add a parser + config := model.Config{ + Name: "test-parser", + Prefix: "02", + Length: 2, + } + + registry.Register("test-parser", config) + + // Verify parser was added + if len(registry.ParserList) != 1 { + t.Errorf("Expected 1 parser in registry, got %d", len(registry.ParserList)) + } + + if _, exists := registry.ParserList["test-parser"]; !exists { + t.Error("Parser 'test-parser' should exist in registry") + } +} + +func TestParserRegistry_RemoveParser(t *testing.T) { + // Setup + registry := &model.ParserRegistry{} + + config := model.Config{ + Name: "test-parser", + Prefix: "02", + Length: 2, + } + + registry.Register("test-parser", config) + + // Remove parser + registry.Unregister("test-parser") + + // Verify parser was removed + if len(registry.ParserList) != 0 { + t.Errorf("Expected 0 parsers in registry, got %d", len(registry.ParserList)) + } + + if _, exists := registry.ParserList["test-parser"]; exists { + t.Error("Parser 'test-parser' should not exist in registry") + } +} + +func TestParserRegistry_UpdateParser(t *testing.T) { + // Setup + registry := &model.ParserRegistry{} + + // Add initial parser + config1 := model.Config{ + Name: "test-parser", + Prefix: "02", + Length: 2, + } + + registry.Register("test-parser", config1) + + // Update parser + config2 := model.Config{ + Name: "test-parser", + Prefix: "03", + Length: 3, + } + + registry.Register("test-parser", config2) + + // Verify only one parser exists + if len(registry.ParserList) != 1 { + t.Errorf("Expected 1 parser in registry, got %d", len(registry.ParserList)) + } + + // Verify it was updated (the new config should be used) + if _, exists := registry.ParserList["test-parser"]; !exists { + t.Error("Parser 'test-parser' should exist in registry") + } +} + +func TestParserRegistry_MultipleParsers(t *testing.T) { + // Setup + registry := &model.ParserRegistry{} + + // Add multiple parsers + parsers := []model.Config{ + {Name: "parser-1", Prefix: "02", Length: 2}, + {Name: "parser-2", Prefix: "03", Length: 3}, + {Name: "parser-3", Prefix: "04", Length: 4}, + } + + for _, p := range parsers { + registry.Register(p.Name, p) + } + + // Verify all parsers were added + if len(registry.ParserList) != 3 { + t.Errorf("Expected 3 parsers in registry, got %d", len(registry.ParserList)) + } + + for _, p := range parsers { + if _, exists := registry.ParserList[p.Name]; !exists { + t.Errorf("Parser '%s' should exist in registry", p.Name) + } + } +} + +func TestParserRegistry_RemoveNonExistent(t *testing.T) { + // Setup + registry := &model.ParserRegistry{} + + // Try to remove non-existent parser - should not panic + registry.Unregister("non-existent") + + // Verify registry is still empty + if len(registry.ParserList) != 0 { + t.Errorf("Expected 0 parsers, got %d", len(registry.ParserList)) + } +} + +func TestParserRegistry_ConcurrentAccess(t *testing.T) { + // Setup + registry := &model.ParserRegistry{} + done := make(chan bool) + + // Concurrent additions + for i := 0; i < 10; i++ { + go func(index int) { + config := model.Config{ + Name: "parser-" + string(rune('A'+index)), + Prefix: "02", + Length: 2, + } + registry.Register(config.Name, config) + done <- true + }(i) + } + + // Wait for all goroutines + for i := 0; i < 10; i++ { + <-done + } + + // Verify all parsers were added + if len(registry.ParserList) != 10 { + t.Errorf("Expected 10 parsers, got %d", len(registry.ParserList)) + } +} + +func TestParserConfig_Structure(t *testing.T) { + config := model.Config{ + Name: "test-config", + Prefix: "0201", + MinLength: 10, + MaxLength: 30, + ParserType: "sensor", + } + + if config.Name != "test-config" { + t.Errorf("Expected name 'test-config', got '%s'", config.Name) + } + + if config.Prefix != "0201" { + t.Errorf("Expected prefix '0201', got '%s'", config.Prefix) + } + + if config.MinLength != 10 { + t.Errorf("Expected MinLength 10, got %d", config.MinLength) + } + + if config.MaxLength != 30 { + t.Errorf("Expected MaxLength 30, got %d", config.MaxLength) + } +} + +func TestKafkaParser_MessageTypes(t *testing.T) { + testCases := []struct { + name string + id string + config model.Config + expected string + }{ + { + name: "add parser", + id: "add", + config: model.Config{Name: "new-parser", Prefix: "02", Length: 2}, + expected: "add", + }, + { + name: "delete parser", + id: "delete", + config: model.Config{Name: "old-parser", Prefix: "02", Length: 2}, + expected: "delete", + }, + { + name: "update parser", + id: "update", + config: model.Config{Name: "updated-parser", Prefix: "03", Length: 3}, + expected: "update", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + msg := model.KafkaParser{ + ID: tc.id, + Name: tc.config.Name, + Config: tc.config, + } + + if msg.ID != tc.expected { + t.Errorf("Expected ID '%s', got '%s'", tc.expected, msg.ID) + } + + if msg.Name != tc.config.Name { + t.Errorf("Expected Name '%s', got '%s'", tc.config.Name, msg.Name) + } + }) + } +} + +func TestParserRegistry_EmptyRegistry(t *testing.T) { + // Setup empty registry + registry := &model.ParserRegistry{} + + // Verify it's empty + if len(registry.ParserList) != 0 { + t.Errorf("Expected empty registry, got %d parsers", len(registry.ParserList)) + } + + // Should be safe to call Unregister on empty registry + registry.Unregister("anything") +} + +func TestParserRegistry_ParserReplacement(t *testing.T) { + // Setup + registry := &model.ParserRegistry{} + + // Add parser with config 1 + config1 := model.Config{ + Name: "test-parser", + Prefix: "02", + Length: 2, + } + + registry.Register("test-parser", config1) + + // Replace with config 2 (same name) + config2 := model.Config{ + Name: "test-parser", + Prefix: "03", + Length: 3, + } + + registry.Register("test-parser", config2) + + // Verify only one entry exists + if len(registry.ParserList) != 1 { + t.Errorf("Expected 1 parser after replacement, got %d", len(registry.ParserList)) + } + + // Verify the parser still exists + if _, exists := registry.ParserList["test-parser"]; !exists { + t.Error("Parser 'test-parser' should still exist") + } +} diff --git a/tests/decoder/testutil.go b/tests/decoder/testutil.go new file mode 100644 index 0000000..ce977a1 --- /dev/null +++ b/tests/decoder/testutil.go @@ -0,0 +1,321 @@ +package decoder + +import ( + "context" + "testing" + + "github.com/AFASystems/presence/internal/pkg/common/appcontext" + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/segmentio/kafka-go" +) + +// MockKafkaWriter is a mock implementation of kafkaWriter for testing +type MockKafkaWriter struct { + Messages []kafka.Message +} + +func (m *MockKafkaWriter) WriteMessages(ctx context.Context, msgs ...kafka.Message) error { + m.Messages = append(m.Messages, msgs...) + return nil +} + +// TestHelper provides utility functions for decoder testing +type TestHelper struct { + t *testing.T + appState *appcontext.AppState + parserRegistry *model.ParserRegistry +} + +// NewTestHelper creates a new test helper instance +func NewTestHelper(t *testing.T) *TestHelper { + return &TestHelper{ + t: t, + appState: appcontext.NewAppState(), + parserRegistry: &model.ParserRegistry{}, + } +} + +// GetAppState returns the appState instance +func (th *TestHelper) GetAppState() *appcontext.AppState { + return th.appState +} + +// GetParserRegistry returns the parser registry +func (th *TestHelper) GetParserRegistry() *model.ParserRegistry { + return th.parserRegistry +} + +// RegisterTestParser registers a parser with default test configuration +func (th *TestHelper) RegisterTestParser(name string) { + config := model.Config{ + Name: name, + Min: 2, + Max: 20, + Pattern: []string{"02"}, + Configs: map[string]model.ParserConfig{ + "length": {Length: 2, Offset: 0, Order: "big"}, + }, + } + th.parserRegistry.Register(name, config) +} + +// CreateBeaconAdvertisement creates a test beacon advertisement +func (th *TestHelper) CreateBeaconAdvertisement(id, data string) model.BeaconAdvertisement { + return model.BeaconAdvertisement{ + ID: id, + Data: data, + } +} + +// CreateValidHexAdvertisement creates a beacon with valid hex data +func (th *TestHelper) CreateValidHexAdvertisement(id string) model.BeaconAdvertisement { + return model.BeaconAdvertisement{ + ID: id, + Data: "020106", + } +} + +// CreateInvalidHexAdvertisement creates a beacon with invalid hex data +func (th *TestHelper) CreateInvalidHexAdvertisement(id string) model.BeaconAdvertisement { + return model.BeaconAdvertisement{ + ID: id, + Data: "INVALID_HEX", + } +} + +// CreateEmptyAdvertisement creates a beacon with empty data +func (th *TestHelper) CreateEmptyAdvertisement(id string) model.BeaconAdvertisement { + return model.BeaconAdvertisement{ + ID: id, + Data: "", + } +} + +// AssertParserExists asserts that a parser exists in the registry +func (th *TestHelper) AssertParserExists(name string) { + if _, exists := th.parserRegistry.ParserList[name]; !exists { + th.t.Errorf("Parser '%s' should exist in registry", name) + } +} + +// AssertParserNotExists asserts that a parser does not exist in the registry +func (th *TestHelper) AssertParserNotExists(name string) { + if _, exists := th.parserRegistry.ParserList[name]; exists { + th.t.Errorf("Parser '%s' should not exist in registry", name) + } +} + +// AssertEventExists asserts that an event exists in appState +func (th *TestHelper) AssertEventExists(id string) model.BeaconEvent { + event, exists := th.appState.GetBeaconEvent(id) + if !exists { + th.t.Errorf("Event for beacon '%s' should exist in appState", id) + return model.BeaconEvent{} + } + return event +} + +// AssertEventNotExists asserts that an event does not exist in appState +func (th *TestHelper) AssertEventNotExists(id string) { + _, exists := th.appState.GetBeaconEvent(id) + if exists { + th.t.Errorf("Event for beacon '%s' should not exist in appState", id) + } +} + +// AssertParserCount asserts the number of parsers in the registry +func (th *TestHelper) AssertParserCount(expected int) { + if len(th.parserRegistry.ParserList) != expected { + th.t.Errorf("Expected %d parsers in registry, got %d", expected, len(th.parserRegistry.ParserList)) + } +} + +// Helper functions for creating test configurations + +// CreateTestConfig creates a test parser configuration +func CreateTestConfig(name string, min, max int, pattern []string) model.Config { + return model.Config{ + Name: name, + Min: min, + Max: max, + Pattern: pattern, + Configs: map[string]model.ParserConfig{ + "length": {Length: 2, Offset: 0, Order: "big"}, + }, + } +} + +// CreateKafkaParserMessage creates a Kafka parser message for testing +func CreateKafkaParserMessage(id, name string, config model.Config) model.KafkaParser { + return model.KafkaParser{ + ID: id, + Name: name, + Config: config, + } +} + +// AssertNoError asserts that an error is nil +func AssertNoError(t *testing.T, err error, msg string) { + if err != nil { + t.Errorf("%s: %v", msg, err) + } +} + +// AssertError asserts that an error is not nil +func AssertError(t *testing.T, err error, msg string) { + if err == nil { + t.Errorf("%s: expected error but got nil", msg) + } +} + +// Common test data + +// Valid hex strings for testing +var ValidHexStrings = []string{ + "020106", // Simple AD structure + "0201060302A0", // AD structure with flags + "1AFF0C01", // iBeacon-like data + "0201061AFF0C01", // Multiple AD structures +} + +// Invalid hex strings for testing +var InvalidHexStrings = []string{ + "INVALID_HEX", + "02016ZZZ", + "GGGGGG", + "NOT-HEX", +} + +// Empty or whitespace data for testing +var EmptyTestData = []string{ + "", + " ", + "\t\n", +} + +// CreateMockWriter creates a mock Kafka writer +func CreateMockWriter() *MockKafkaWriter { + return &MockKafkaWriter{Messages: []kafka.Message{}} +} + +// Beacon event test helpers + +// AssertEventFields asserts that event fields match expected values +func AssertEventFields(t *testing.T, event model.BeaconEvent, expectedID, expectedType string) { + if event.ID != expectedID { + t.Errorf("Expected event ID '%s', got '%s'", expectedID, event.ID) + } + + if event.Type != expectedType { + t.Errorf("Expected event type '%s', got '%s'", expectedType, event.Type) + } +} + +// SetupTestParsers registers a standard set of test parsers +func SetupTestParsers(registry *model.ParserRegistry) { + parsers := []model.Config{ + {Name: "parser-1", Min: 2, Max: 20, Pattern: []string{"02"}}, + {Name: "parser-2", Min: 3, Max: 25, Pattern: []string{"03"}}, + {Name: "parser-3", Min: 4, Max: 30, Pattern: []string{"04"}}, + } + + for _, p := range parsers { + registry.Register(p.Name, p) + } +} + +// CleanupTestParsers removes all parsers from the registry +func CleanupTestParsers(registry *model.ParserRegistry) { + for name := range registry.ParserList { + registry.Unregister(name) + } +} + +// CreateTestBeaconEvent creates a test beacon event +func CreateTestBeaconEvent(id, eventType string) model.BeaconEvent { + return model.BeaconEvent{ + ID: id, + Type: eventType, + Battery: 100, + Event: 1, + AccX: 0, + AccY: 0, + AccZ: 0, + } +} + +// AssertKafkaMessageCount asserts the number of Kafka messages +func AssertKafkaMessageCount(t *testing.T, writer *MockKafkaWriter, expected int) { + if len(writer.Messages) != expected { + t.Errorf("Expected %d Kafka message(s), got %d", expected, len(writer.Messages)) + } +} + +// AssertNoKafkaMessages asserts that no messages were written to Kafka +func AssertNoKafkaMessages(t *testing.T, writer *MockKafkaWriter) { + AssertKafkaMessageCount(t, writer, 0) +} + +// Parser registry test helpers + +// SimulateEventLoopParserUpdate simulates the event loop's parser update logic +func SimulateEventLoopParserUpdate(msg model.KafkaParser, registry *model.ParserRegistry) { + switch msg.ID { + case "add": + config := msg.Config + registry.Register(config.Name, config) + case "delete": + registry.Unregister(msg.Name) + case "update": + config := msg.Config + registry.Register(config.Name, config) + } +} + +// CreateParserAddMessage creates a parser add message +func CreateParserAddMessage(name string, min, max int) model.KafkaParser { + return model.KafkaParser{ + ID: "add", + Name: name, + Config: model.Config{ + Name: name, + Min: min, + Max: max, + Pattern: []string{"02"}, + }, + } +} + +// CreateParserDeleteMessage creates a parser delete message +func CreateParserDeleteMessage(name string) model.KafkaParser { + return model.KafkaParser{ + ID: "delete", + Name: name, + } +} + +// CreateParserUpdateMessage creates a parser update message +func CreateParserUpdateMessage(name string, min, max int) model.KafkaParser { + return model.KafkaParser{ + ID: "update", + Name: name, + Config: model.Config{ + Name: name, + Min: min, + Max: max, + Pattern: []string{"02"}, + }, + } +} + +// GenerateTestBeaconID generates a test beacon ID +func GenerateTestBeaconID(index int) string { + return "test-beacon-" + string(rune('A'+index)) +} + +// GenerateTestHexData generates test hex data +func GenerateTestHexData(index int) string { + prefix := "02" + value := string(rune('6' + index)) + return prefix + "01" + value +}