package decoder import ( "context" "testing" "time" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/model" ) func TestEventLoop_RawMessageProcessing(t *testing.T) { // Setup appState := appcontext.NewAppState() mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} parserRegistry := &model.ParserRegistry{} chRaw := make(chan model.BeaconAdvertisement, 10) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Create a test message msg := model.BeaconAdvertisement{ ID: "test-beacon", Data: "020106", } // Simulate event loop processing go func() { for { select { case <-ctx.Done(): return case m := <-chRaw: processIncoming(m, appState, mockWriter, parserRegistry) } } }() // Send message chRaw <- msg // Give it time to process time.Sleep(100 * time.Millisecond) // Cancel context cancel() // Verify message was processed (even if no parser matched, processIncoming was called) // We just verify no panic occurred } func TestEventLoop_ParserRegistryUpdates(t *testing.T) { // Setup appState := appcontext.NewAppState() parserRegistry := &model.ParserRegistry{} chParser := make(chan model.KafkaParser, 10) // Test ADD operation addMsg := model.KafkaParser{ ID: "add", Name: "new-parser", Config: model.Config{ Name: "new-parser", Prefix: "02", Length: 2, }, } chParser <- addMsg // Simulate event loop handling select { case msg := <-chParser: switch msg.ID { case "add": config := msg.Config parserRegistry.Register(config.Name, config) case "delete": parserRegistry.Unregister(msg.Name) case "update": config := msg.Config parserRegistry.Register(config.Name, config) } case <-time.After(1 * time.Second): t.Fatal("Timeout waiting for parser message") } // Verify parser was added if len(parserRegistry.ParserList) != 1 { t.Errorf("Expected 1 parser after add, got %d", len(parserRegistry.ParserList)) } // Test DELETE operation deleteMsg := model.KafkaParser{ ID: "delete", Name: "new-parser", } chParser <- deleteMsg select { case msg := <-chParser: switch msg.ID { case "add": config := msg.Config parserRegistry.Register(config.Name, config) case "delete": parserRegistry.Unregister(msg.Name) case "update": config := msg.Config parserRegistry.Register(config.Name, config) } case <-time.After(1 * time.Second): t.Fatal("Timeout waiting for parser message") } // Verify parser was deleted if len(parserRegistry.ParserList) != 0 { t.Errorf("Expected 0 parsers after delete, got %d", len(parserRegistry.ParserList)) } } func TestEventLoop_UpdateParser(t *testing.T) { // Setup appState := appcontext.NewAppState() parserRegistry := &model.ParserRegistry{} // Add initial parser parserRegistry.Register("test-parser", model.Config{ Name: "test-parser", Prefix: "02", Length: 2, }) chParser := make(chan model.KafkaParser, 10) // Test UPDATE operation updateMsg := model.KafkaParser{ ID: "update", Name: "test-parser", Config: model.Config{ Name: "test-parser", Prefix: "03", Length: 3, }, } chParser <- updateMsg // Simulate event loop handling select { case msg := <-chParser: switch msg.ID { case "add": config := msg.Config parserRegistry.Register(config.Name, config) case "delete": parserRegistry.Unregister(msg.Name) case "update": config := msg.Config parserRegistry.Register(config.Name, config) } case <-time.After(1 * time.Second): t.Fatal("Timeout waiting for parser message") } // Verify parser still exists (was updated, not deleted) if len(parserRegistry.ParserList) != 1 { t.Errorf("Expected 1 parser after update, got %d", len(parserRegistry.ParserList)) } if _, exists := parserRegistry.ParserList["test-parser"]; !exists { t.Error("Parser should still exist after update") } } func TestEventLoop_MultipleParserOperations(t *testing.T) { // Setup appState := appcontext.NewAppState() parserRegistry := &model.ParserRegistry{} chParser := make(chan model.KafkaParser, 10) // Send multiple operations operations := []model.KafkaParser{ {ID: "add", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "02", Length: 2}}, {ID: "add", Name: "parser-2", Config: model.Config{Name: "parser-2", Prefix: "03", Length: 3}}, {ID: "add", Name: "parser-3", Config: model.Config{Name: "parser-3", Prefix: "04", Length: 4}}, {ID: "delete", Name: "parser-2"}, {ID: "update", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "05", Length: 5}}, } for _, op := range operations { chParser <- op } // Process all operations for i := 0; i < len(operations); i++ { select { case msg := <-chParser: switch msg.ID { case "add": config := msg.Config parserRegistry.Register(config.Name, config) case "delete": parserRegistry.Unregister(msg.Name) case "update": config := msg.Config parserRegistry.Register(config.Name, config) } case <-time.After(1 * time.Second): t.Fatalf("Timeout processing operation %d", i) } } // Verify final state if len(parserRegistry.ParserList) != 2 { t.Errorf("Expected 2 parsers after all operations, got %d", len(parserRegistry.ParserList)) } // parser-1 should exist (updated) if _, exists := parserRegistry.ParserList["parser-1"]; !exists { t.Error("parser-1 should exist") } // parser-2 should not exist (deleted) if _, exists := parserRegistry.ParserList["parser-2"]; exists { t.Error("parser-2 should not exist") } // parser-3 should exist (added) if _, exists := parserRegistry.ParserList["parser-3"]; !exists { t.Error("parser-3 should exist") } } func TestEventLoop_ContextCancellation(t *testing.T) { // Setup ctx, cancel := context.WithCancel(context.Background()) defer cancel() chRaw := make(chan model.BeaconAdvertisement, 10) chParser := make(chan model.KafkaParser, 10) // Cancel immediately cancel() // Verify context is cancelled select { case <-ctx.Done(): // Expected - context was cancelled return case msg := <-chRaw: t.Errorf("Should not receive raw messages after context cancellation, got: %+v", msg) case msg := <-chParser: t.Errorf("Should not receive parser messages after context cancellation, got: %+v", msg) case <-time.After(1 * time.Second): t.Error("Timeout - context cancellation should have been immediate") } } func TestEventLoop_ChannelBuffering(t *testing.T) { // Setup appState := appcontext.NewAppState() mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} parserRegistry := &model.ParserRegistry{} // Create buffered channels (like in main) chRaw := make(chan model.BeaconAdvertisement, 2000) chParser := make(chan model.KafkaParser, 200) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Send multiple messages without blocking for i := 0; i < 100; i++ { msg := model.BeaconAdvertisement{ ID: "test-beacon", Data: "020106", } chRaw <- msg } // Verify all messages are buffered if len(chRaw) != 100 { t.Errorf("Expected 100 messages in buffer, got %d", len(chRaw)) } // Send parser updates for i := 0; i < 10; i++ { msg := model.KafkaParser{ ID: "add", Name: "parser-" + string(rune('A'+i)), Config: model.Config{ Name: "parser-" + string(rune('A'+i)), Prefix: "02", Length: 2, }, } chParser <- msg } // Verify all parser messages are buffered if len(chParser) != 10 { t.Errorf("Expected 10 parser messages in buffer, got %d", len(chParser)) } // Cancel context cancel() } func TestEventLoop_ParserAndRawChannels(t *testing.T) { // Setup appState := appcontext.NewAppState() mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}} parserRegistry := &model.ParserRegistry{} chRaw := make(chan model.BeaconAdvertisement, 10) chParser := make(chan model.KafkaParser, 10) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Send both raw and parser messages rawMsg := model.BeaconAdvertisement{ ID: "test-beacon", Data: "020106", } parserMsg := model.KafkaParser{ ID: "add", Name: "test-parser", Config: model.Config{ Name: "test-parser", Prefix: "02", Length: 2, }, } chRaw <- rawMsg chParser <- parserMsg // Process both messages processedRaw := false processedParser := false for i := 0; i < 2; i++ { select { case <-chRaw: processedRaw = true case <-chParser: processedParser = true case <-time.After(1 * time.Second): t.Fatal("Timeout waiting for messages") } } if !processedRaw { t.Error("Raw message should have been processed") } if !processedParser { t.Error("Parser message should have been processed") } cancel() }