package bridge import ( "context" "encoding/json" "fmt" "strings" "time" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/model" "github.com/segmentio/kafka-go" mqtt "github.com/eclipse/paho.mqtt.golang" ) // mqtthandler is extracted from main.go for testing purposes func mqtthandler(writer kafkaWriter, topic string, message []byte, appState *appcontext.AppState) { hostname := strings.Split(topic, "/")[1] msgStr := string(message) if strings.HasPrefix(msgStr, "[") { var readings []model.RawReading err := json.Unmarshal(message, &readings) if err != nil { fmt.Println("Error parsing JSON:", err) return } for _, reading := range readings { if reading.Type == "Gateway" { continue } val, ok := appState.BeaconExists(reading.MAC) if !ok { continue } adv := model.BeaconAdvertisement{ ID: val, Hostname: hostname, MAC: reading.MAC, RSSI: int64(reading.RSSI), Data: reading.RawData, } encodedMsg, err := json.Marshal(adv) if err != nil { fmt.Println("Error in marshaling:", err) break } msg := kafka.Message{ Value: encodedMsg, } err = writer.WriteMessages(context.Background(), msg) if err != nil { fmt.Println("Error in writing to Kafka:", err) time.Sleep(1 * time.Second) break } } } } // kafkaWriter interface defines the methods we need from kafka.Writer type kafkaWriter interface { WriteMessages(ctx context.Context, msgs ...kafka.Message) error } // messagePubHandler is extracted from main.go for testing purposes var messagePubHandler = func(msg mqtt.Message, writer kafkaWriter, appState *appcontext.AppState) { mqtthandler(writer, msg.Topic(), msg.Payload(), appState) }