|
- package bridge
-
- import (
- "context"
- "encoding/json"
- "fmt"
- "os"
- "testing"
- "time"
-
- "github.com/AFASystems/presence/internal/pkg/common/appcontext"
- "github.com/AFASystems/presence/internal/pkg/model"
- "github.com/segmentio/kafka-go"
- )
-
- // TestIntegration_EndToEnd tests the complete flow from MQTT message to Kafka
- // This test requires real Kafka and doesn't mock the writer
- func TestIntegration_EndToEnd(t *testing.T) {
- if testing.Short() {
- t.Skip("Skipping integration test in short mode")
- }
-
- // Check if Kafka is available
- kafkaURL := os.Getenv("KAFKA_URL")
- if kafkaURL == "" {
- kafkaURL = "localhost:9092"
- }
-
- // Create a test topic
- testTopic := "test-rawbeacons-" + time.Now().Format("20060102150405")
-
- // Setup
- appState := appcontext.NewAppState()
- appState.AddBeaconToLookup("AA:BB:CC:DD:EE:FF", "test-beacon-1")
-
- // Create real Kafka writer
- writer := kafka.NewWriter(kafka.WriterConfig{
- Brokers: []string{kafkaURL},
- Topic: testTopic,
- })
- defer writer.Close()
-
- // Create Kafka reader to verify messages
- reader := kafka.NewReader(kafka.ReaderConfig{
- Brokers: []string{kafkaURL},
- Topic: testTopic,
- GroupID: "test-group-" + time.Now().Format("20060102150405"),
- })
- defer reader.Close()
-
- // Create a test message
- reading := model.RawReading{
- Timestamp: time.Now().Format(time.RFC3339),
- Type: "BLE",
- MAC: "AA:BB:CC:DD:EE:FF",
- RSSI: -65,
- RawData: "0201060302A0",
- }
-
- messageBytes, _ := json.Marshal([]model.RawReading{reading})
-
- // Execute
- mqtthandler(writer, "publish_out/gateway-1", messageBytes, appState)
-
- // Give Kafka time to propagate
- time.Sleep(1 * time.Second)
-
- // Read back the message
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- msg, err := reader.ReadMessage(ctx)
- if err != nil {
- t.Fatalf("Failed to read message from Kafka: %v", err)
- }
-
- // Verify
- var adv model.BeaconAdvertisement
- err = json.Unmarshal(msg.Value, &adv)
- if err != nil {
- t.Fatalf("Failed to unmarshal beacon advertisement: %v", err)
- }
-
- if adv.ID != "test-beacon-1" {
- t.Errorf("Expected ID 'test-beacon-1', got '%s'", adv.ID)
- }
-
- if adv.Hostname != "gateway-1" {
- t.Errorf("Expected hostname 'gateway-1', got '%s'", adv.Hostname)
- }
-
- if adv.MAC != "AA:BB:CC:DD:EE:FF" {
- t.Errorf("Expected MAC 'AA:BB:CC:DD:EE:FF', got '%s'", adv.MAC)
- }
- }
-
- // TestIntegration_MultipleMessages tests handling multiple messages in sequence
- func TestIntegration_MultipleMessages(t *testing.T) {
- if testing.Short() {
- t.Skip("Skipping integration test in short mode")
- }
-
- kafkaURL := os.Getenv("KAFKA_URL")
- if kafkaURL == "" {
- kafkaURL = "localhost:9092"
- }
-
- testTopic := "test-rawbeacons-multi-" + time.Now().Format("20060102150405")
-
- // Setup
- appState := appcontext.NewAppState()
- appState.AddBeaconToLookup("AA:BB:CC:DD:EE:FF", "test-beacon-1")
- appState.AddBeaconToLookup("11:22:33:44:55:66", "test-beacon-2")
-
- writer := kafka.NewWriter(kafka.WriterConfig{
- Brokers: []string{kafkaURL},
- Topic: testTopic,
- })
- defer writer.Close()
-
- reader := kafka.NewReader(kafka.ReaderConfig{
- Brokers: []string{kafkaURL},
- Topic: testTopic,
- GroupID: "test-group-multi-" + time.Now().Format("20060102150405"),
- MinBytes: 10e3, // 10KB
- MaxBytes: 10e6, // 10MB
- })
- defer reader.Close()
-
- // Send multiple messages
- for i := 0; i < 5; i++ {
- reading := model.RawReading{
- Timestamp: time.Now().Format(time.RFC3339),
- Type: "BLE",
- MAC: "AA:BB:CC:DD:EE:FF",
- RSSI: -65 - i,
- RawData: "0201060302A0",
- }
-
- messageBytes, _ := json.Marshal([]model.RawReading{reading})
- mqtthandler(writer, "publish_out/gateway-1", messageBytes, appState)
- }
-
- // Give Kafka time to propagate
- time.Sleep(2 * time.Second)
-
- // Read and verify messages
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
- defer cancel()
-
- messageCount := 0
- for i := 0; i < 5; i++ {
- msg, err := reader.ReadMessage(ctx)
- if err != nil {
- t.Fatalf("Failed to read message %d from Kafka: %v", i+1, err)
- }
-
- var adv model.BeaconAdvertisement
- err = json.Unmarshal(msg.Value, &adv)
- if err != nil {
- t.Fatalf("Failed to unmarshal beacon advertisement %d: %v", i+1, err)
- }
-
- if adv.ID != "test-beacon-1" {
- t.Errorf("Message %d: Expected ID 'test-beacon-1', got '%s'", i+1, adv.ID)
- }
-
- messageCount++
- }
-
- if messageCount != 5 {
- t.Errorf("Expected 5 messages, got %d", messageCount)
- }
- }
-
- // TestIntegration_AppStateSequentialOperations tests sequential operations on AppState
- func TestIntegration_AppStateSequentialOperations(t *testing.T) {
- // Setup
- appState := appcontext.NewAppState()
-
- // Test sequential beacon additions
- for i := 0; i < 100; i++ {
- mac := generateMAC(i)
- id := generateID(i)
- appState.AddBeaconToLookup(mac, id)
- }
-
- // Verify all beacons were added
- for i := 0; i < 100; i++ {
- mac := generateMAC(i)
- expectedID := generateID(i)
-
- id, exists := appState.BeaconExists(mac)
- if !exists {
- t.Errorf("Beacon with MAC %s should exist", mac)
- }
-
- if id != expectedID {
- t.Errorf("Expected ID '%s', got '%s'", expectedID, id)
- }
- }
-
- // Test sequential deletions
- for i := 0; i < 50; i++ {
- mac := generateMAC(i)
- appState.RemoveBeaconFromLookup(mac)
- }
-
- // Verify deletions
- for i := 0; i < 50; i++ {
- mac := generateMAC(i)
- _, exists := appState.BeaconExists(mac)
- if exists {
- t.Errorf("Beacon with MAC %s should have been deleted", mac)
- }
- }
-
- // Verify remaining beacons still exist
- for i := 50; i < 100; i++ {
- mac := generateMAC(i)
- _, exists := appState.BeaconExists(mac)
- if !exists {
- t.Errorf("Beacon with MAC %s should still exist", mac)
- }
- }
- }
-
- func generateMAC(index int) string {
- return fmt.Sprintf("AA:BB:CC:DD:%02X:%02X", (index>>8)&0xFF, index&0xFF)
- }
-
- func generateID(index int) string {
- return fmt.Sprintf("beacon-%d", index)
- }
-
- // TestIntegration_CleanLookup tests the CleanLookup functionality
- func TestIntegration_CleanLookup(t *testing.T) {
- // Setup
- appState := appcontext.NewAppState()
-
- // Add multiple beacons
- for i := 0; i < 10; i++ {
- mac := generateMAC(i)
- id := generateID(i)
- appState.AddBeaconToLookup(mac, id)
- }
-
- // Verify they were added
- for i := 0; i < 10; i++ {
- mac := generateMAC(i)
- _, exists := appState.BeaconExists(mac)
- if !exists {
- t.Errorf("Beacon with MAC %s should exist before cleanup", mac)
- }
- }
-
- // Clean lookup
- appState.CleanLookup()
-
- // Verify all beacons were removed
- for i := 0; i < 10; i++ {
- mac := generateMAC(i)
- _, exists := appState.BeaconExists(mac)
- if exists {
- t.Errorf("Beacon with MAC %s should have been removed by cleanup", mac)
- }
- }
- }
|