package bridge import ( "context" "encoding/json" "fmt" "os" "testing" "time" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/model" "github.com/segmentio/kafka-go" ) // TestIntegration_EndToEnd tests the complete flow from MQTT message to Kafka // This test requires real Kafka and doesn't mock the writer func TestIntegration_EndToEnd(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } // Check if Kafka is available kafkaURL := os.Getenv("KAFKA_URL") if kafkaURL == "" { kafkaURL = "localhost:9092" } // Create a test topic testTopic := "test-rawbeacons-" + time.Now().Format("20060102150405") // Setup appState := appcontext.NewAppState() appState.AddBeaconToLookup("AA:BB:CC:DD:EE:FF", "test-beacon-1") // Create real Kafka writer writer := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{kafkaURL}, Topic: testTopic, }) defer writer.Close() // Create Kafka reader to verify messages reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{kafkaURL}, Topic: testTopic, GroupID: "test-group-" + time.Now().Format("20060102150405"), }) defer reader.Close() // Create a test message reading := model.RawReading{ Timestamp: time.Now().Format(time.RFC3339), Type: "BLE", MAC: "AA:BB:CC:DD:EE:FF", RSSI: -65, RawData: "0201060302A0", } messageBytes, _ := json.Marshal([]model.RawReading{reading}) // Execute mqtthandler(writer, "publish_out/gateway-1", messageBytes, appState) // Give Kafka time to propagate time.Sleep(1 * time.Second) // Read back the message ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() msg, err := reader.ReadMessage(ctx) if err != nil { t.Fatalf("Failed to read message from Kafka: %v", err) } // Verify var adv model.BeaconAdvertisement err = json.Unmarshal(msg.Value, &adv) if err != nil { t.Fatalf("Failed to unmarshal beacon advertisement: %v", err) } if adv.ID != "test-beacon-1" { t.Errorf("Expected ID 'test-beacon-1', got '%s'", adv.ID) } if adv.Hostname != "gateway-1" { t.Errorf("Expected hostname 'gateway-1', got '%s'", adv.Hostname) } if adv.MAC != "AA:BB:CC:DD:EE:FF" { t.Errorf("Expected MAC 'AA:BB:CC:DD:EE:FF', got '%s'", adv.MAC) } } // TestIntegration_MultipleMessages tests handling multiple messages in sequence func TestIntegration_MultipleMessages(t *testing.T) { if testing.Short() { t.Skip("Skipping integration test in short mode") } kafkaURL := os.Getenv("KAFKA_URL") if kafkaURL == "" { kafkaURL = "localhost:9092" } testTopic := "test-rawbeacons-multi-" + time.Now().Format("20060102150405") // Setup appState := appcontext.NewAppState() appState.AddBeaconToLookup("AA:BB:CC:DD:EE:FF", "test-beacon-1") appState.AddBeaconToLookup("11:22:33:44:55:66", "test-beacon-2") writer := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{kafkaURL}, Topic: testTopic, }) defer writer.Close() reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{kafkaURL}, Topic: testTopic, GroupID: "test-group-multi-" + time.Now().Format("20060102150405"), MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }) defer reader.Close() // Send multiple messages for i := 0; i < 5; i++ { reading := model.RawReading{ Timestamp: time.Now().Format(time.RFC3339), Type: "BLE", MAC: "AA:BB:CC:DD:EE:FF", RSSI: -65 - i, RawData: "0201060302A0", } messageBytes, _ := json.Marshal([]model.RawReading{reading}) mqtthandler(writer, "publish_out/gateway-1", messageBytes, appState) } // Give Kafka time to propagate time.Sleep(2 * time.Second) // Read and verify messages ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() messageCount := 0 for i := 0; i < 5; i++ { msg, err := reader.ReadMessage(ctx) if err != nil { t.Fatalf("Failed to read message %d from Kafka: %v", i+1, err) } var adv model.BeaconAdvertisement err = json.Unmarshal(msg.Value, &adv) if err != nil { t.Fatalf("Failed to unmarshal beacon advertisement %d: %v", i+1, err) } if adv.ID != "test-beacon-1" { t.Errorf("Message %d: Expected ID 'test-beacon-1', got '%s'", i+1, adv.ID) } messageCount++ } if messageCount != 5 { t.Errorf("Expected 5 messages, got %d", messageCount) } } // TestIntegration_AppStateSequentialOperations tests sequential operations on AppState func TestIntegration_AppStateSequentialOperations(t *testing.T) { // Setup appState := appcontext.NewAppState() // Test sequential beacon additions for i := 0; i < 100; i++ { mac := generateMAC(i) id := generateID(i) appState.AddBeaconToLookup(mac, id) } // Verify all beacons were added for i := 0; i < 100; i++ { mac := generateMAC(i) expectedID := generateID(i) id, exists := appState.BeaconExists(mac) if !exists { t.Errorf("Beacon with MAC %s should exist", mac) } if id != expectedID { t.Errorf("Expected ID '%s', got '%s'", expectedID, id) } } // Test sequential deletions for i := 0; i < 50; i++ { mac := generateMAC(i) appState.RemoveBeaconFromLookup(mac) } // Verify deletions for i := 0; i < 50; i++ { mac := generateMAC(i) _, exists := appState.BeaconExists(mac) if exists { t.Errorf("Beacon with MAC %s should have been deleted", mac) } } // Verify remaining beacons still exist for i := 50; i < 100; i++ { mac := generateMAC(i) _, exists := appState.BeaconExists(mac) if !exists { t.Errorf("Beacon with MAC %s should still exist", mac) } } } func generateMAC(index int) string { return fmt.Sprintf("AA:BB:CC:DD:%02X:%02X", (index>>8)&0xFF, index&0xFF) } func generateID(index int) string { return fmt.Sprintf("beacon-%d", index) } // TestIntegration_CleanLookup tests the CleanLookup functionality func TestIntegration_CleanLookup(t *testing.T) { // Setup appState := appcontext.NewAppState() // Add multiple beacons for i := 0; i < 10; i++ { mac := generateMAC(i) id := generateID(i) appState.AddBeaconToLookup(mac, id) } // Verify they were added for i := 0; i < 10; i++ { mac := generateMAC(i) _, exists := appState.BeaconExists(mac) if !exists { t.Errorf("Beacon with MAC %s should exist before cleanup", mac) } } // Clean lookup appState.CleanLookup() // Verify all beacons were removed for i := 0; i < 10; i++ { mac := generateMAC(i) _, exists := appState.BeaconExists(mac) if exists { t.Errorf("Beacon with MAC %s should have been removed by cleanup", mac) } } }