|
- package decoder
-
- import (
- "context"
- "testing"
- "time"
-
- "github.com/AFASystems/presence/internal/pkg/common/appcontext"
- "github.com/AFASystems/presence/internal/pkg/model"
- )
-
- func TestEventLoop_RawMessageProcessing(t *testing.T) {
- // Setup
- appState := appcontext.NewAppState()
- mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
- parserRegistry := &model.ParserRegistry{}
-
- chRaw := make(chan model.BeaconAdvertisement, 10)
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- // Create a test message
- msg := model.BeaconAdvertisement{
- ID: "test-beacon",
- Data: "020106",
- }
-
- // Simulate event loop processing
- go func() {
- for {
- select {
- case <-ctx.Done():
- return
- case m := <-chRaw:
- processIncoming(m, appState, mockWriter, parserRegistry)
- }
- }
- }()
-
- // Send message
- chRaw <- msg
-
- // Give it time to process
- time.Sleep(100 * time.Millisecond)
-
- // Cancel context
- cancel()
-
- // Verify message was processed (even if no parser matched, processIncoming was called)
- // We just verify no panic occurred
- }
-
- func TestEventLoop_ParserRegistryUpdates(t *testing.T) {
- // Setup
- appState := appcontext.NewAppState()
- parserRegistry := &model.ParserRegistry{}
-
- chParser := make(chan model.KafkaParser, 10)
-
- // Test ADD operation
- addMsg := model.KafkaParser{
- ID: "add",
- Name: "new-parser",
- Config: model.Config{
- Name: "new-parser",
- Prefix: "02",
- Length: 2,
- },
- }
-
- chParser <- addMsg
-
- // Simulate event loop handling
- select {
- case msg := <-chParser:
- switch msg.ID {
- case "add":
- config := msg.Config
- parserRegistry.Register(config.Name, config)
- case "delete":
- parserRegistry.Unregister(msg.Name)
- case "update":
- config := msg.Config
- parserRegistry.Register(config.Name, config)
- }
- case <-time.After(1 * time.Second):
- t.Fatal("Timeout waiting for parser message")
- }
-
- // Verify parser was added
- if len(parserRegistry.ParserList) != 1 {
- t.Errorf("Expected 1 parser after add, got %d", len(parserRegistry.ParserList))
- }
-
- // Test DELETE operation
- deleteMsg := model.KafkaParser{
- ID: "delete",
- Name: "new-parser",
- }
-
- chParser <- deleteMsg
-
- select {
- case msg := <-chParser:
- switch msg.ID {
- case "add":
- config := msg.Config
- parserRegistry.Register(config.Name, config)
- case "delete":
- parserRegistry.Unregister(msg.Name)
- case "update":
- config := msg.Config
- parserRegistry.Register(config.Name, config)
- }
- case <-time.After(1 * time.Second):
- t.Fatal("Timeout waiting for parser message")
- }
-
- // Verify parser was deleted
- if len(parserRegistry.ParserList) != 0 {
- t.Errorf("Expected 0 parsers after delete, got %d", len(parserRegistry.ParserList))
- }
- }
-
- func TestEventLoop_UpdateParser(t *testing.T) {
- // Setup
- appState := appcontext.NewAppState()
- parserRegistry := &model.ParserRegistry{}
-
- // Add initial parser
- parserRegistry.Register("test-parser", model.Config{
- Name: "test-parser",
- Prefix: "02",
- Length: 2,
- })
-
- chParser := make(chan model.KafkaParser, 10)
-
- // Test UPDATE operation
- updateMsg := model.KafkaParser{
- ID: "update",
- Name: "test-parser",
- Config: model.Config{
- Name: "test-parser",
- Prefix: "03",
- Length: 3,
- },
- }
-
- chParser <- updateMsg
-
- // Simulate event loop handling
- select {
- case msg := <-chParser:
- switch msg.ID {
- case "add":
- config := msg.Config
- parserRegistry.Register(config.Name, config)
- case "delete":
- parserRegistry.Unregister(msg.Name)
- case "update":
- config := msg.Config
- parserRegistry.Register(config.Name, config)
- }
- case <-time.After(1 * time.Second):
- t.Fatal("Timeout waiting for parser message")
- }
-
- // Verify parser still exists (was updated, not deleted)
- if len(parserRegistry.ParserList) != 1 {
- t.Errorf("Expected 1 parser after update, got %d", len(parserRegistry.ParserList))
- }
-
- if _, exists := parserRegistry.ParserList["test-parser"]; !exists {
- t.Error("Parser should still exist after update")
- }
- }
-
- func TestEventLoop_MultipleParserOperations(t *testing.T) {
- // Setup
- appState := appcontext.NewAppState()
- parserRegistry := &model.ParserRegistry{}
-
- chParser := make(chan model.KafkaParser, 10)
-
- // Send multiple operations
- operations := []model.KafkaParser{
- {ID: "add", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "02", Length: 2}},
- {ID: "add", Name: "parser-2", Config: model.Config{Name: "parser-2", Prefix: "03", Length: 3}},
- {ID: "add", Name: "parser-3", Config: model.Config{Name: "parser-3", Prefix: "04", Length: 4}},
- {ID: "delete", Name: "parser-2"},
- {ID: "update", Name: "parser-1", Config: model.Config{Name: "parser-1", Prefix: "05", Length: 5}},
- }
-
- for _, op := range operations {
- chParser <- op
- }
-
- // Process all operations
- for i := 0; i < len(operations); i++ {
- select {
- case msg := <-chParser:
- switch msg.ID {
- case "add":
- config := msg.Config
- parserRegistry.Register(config.Name, config)
- case "delete":
- parserRegistry.Unregister(msg.Name)
- case "update":
- config := msg.Config
- parserRegistry.Register(config.Name, config)
- }
- case <-time.After(1 * time.Second):
- t.Fatalf("Timeout processing operation %d", i)
- }
- }
-
- // Verify final state
- if len(parserRegistry.ParserList) != 2 {
- t.Errorf("Expected 2 parsers after all operations, got %d", len(parserRegistry.ParserList))
- }
-
- // parser-1 should exist (updated)
- if _, exists := parserRegistry.ParserList["parser-1"]; !exists {
- t.Error("parser-1 should exist")
- }
-
- // parser-2 should not exist (deleted)
- if _, exists := parserRegistry.ParserList["parser-2"]; exists {
- t.Error("parser-2 should not exist")
- }
-
- // parser-3 should exist (added)
- if _, exists := parserRegistry.ParserList["parser-3"]; !exists {
- t.Error("parser-3 should exist")
- }
- }
-
- func TestEventLoop_ContextCancellation(t *testing.T) {
- // Setup
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- chRaw := make(chan model.BeaconAdvertisement, 10)
- chParser := make(chan model.KafkaParser, 10)
-
- // Cancel immediately
- cancel()
-
- // Verify context is cancelled
- select {
- case <-ctx.Done():
- // Expected - context was cancelled
- return
- case msg := <-chRaw:
- t.Errorf("Should not receive raw messages after context cancellation, got: %+v", msg)
- case msg := <-chParser:
- t.Errorf("Should not receive parser messages after context cancellation, got: %+v", msg)
- case <-time.After(1 * time.Second):
- t.Error("Timeout - context cancellation should have been immediate")
- }
- }
-
- func TestEventLoop_ChannelBuffering(t *testing.T) {
- // Setup
- appState := appcontext.NewAppState()
- mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
- parserRegistry := &model.ParserRegistry{}
-
- // Create buffered channels (like in main)
- chRaw := make(chan model.BeaconAdvertisement, 2000)
- chParser := make(chan model.KafkaParser, 200)
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- // Send multiple messages without blocking
- for i := 0; i < 100; i++ {
- msg := model.BeaconAdvertisement{
- ID: "test-beacon",
- Data: "020106",
- }
- chRaw <- msg
- }
-
- // Verify all messages are buffered
- if len(chRaw) != 100 {
- t.Errorf("Expected 100 messages in buffer, got %d", len(chRaw))
- }
-
- // Send parser updates
- for i := 0; i < 10; i++ {
- msg := model.KafkaParser{
- ID: "add",
- Name: "parser-" + string(rune('A'+i)),
- Config: model.Config{
- Name: "parser-" + string(rune('A'+i)),
- Prefix: "02",
- Length: 2,
- },
- }
- chParser <- msg
- }
-
- // Verify all parser messages are buffered
- if len(chParser) != 10 {
- t.Errorf("Expected 10 parser messages in buffer, got %d", len(chParser))
- }
-
- // Cancel context
- cancel()
- }
-
- func TestEventLoop_ParserAndRawChannels(t *testing.T) {
- // Setup
- appState := appcontext.NewAppState()
- mockWriter := &MockKafkaWriter{Messages: []kafka.Message{}}
- parserRegistry := &model.ParserRegistry{}
-
- chRaw := make(chan model.BeaconAdvertisement, 10)
- chParser := make(chan model.KafkaParser, 10)
-
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
-
- // Send both raw and parser messages
- rawMsg := model.BeaconAdvertisement{
- ID: "test-beacon",
- Data: "020106",
- }
-
- parserMsg := model.KafkaParser{
- ID: "add",
- Name: "test-parser",
- Config: model.Config{
- Name: "test-parser",
- Prefix: "02",
- Length: 2,
- },
- }
-
- chRaw <- rawMsg
- chParser <- parserMsg
-
- // Process both messages
- processedRaw := false
- processedParser := false
-
- for i := 0; i < 2; i++ {
- select {
- case <-chRaw:
- processedRaw = true
- case <-chParser:
- processedParser = true
- case <-time.After(1 * time.Second):
- t.Fatal("Timeout waiting for messages")
- }
- }
-
- if !processedRaw {
- t.Error("Raw message should have been processed")
- }
-
- if !processedParser {
- t.Error("Parser message should have been processed")
- }
-
- cancel()
- }
|