You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

76 lines
1.8 KiB

  1. package bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "strings"
  7. "time"
  8. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  9. "github.com/AFASystems/presence/internal/pkg/model"
  10. "github.com/segmentio/kafka-go"
  11. mqtt "github.com/eclipse/paho.mqtt.golang"
  12. )
  13. // mqtthandler is extracted from main.go for testing purposes
  14. func mqtthandler(writer kafkaWriter, topic string, message []byte, appState *appcontext.AppState) {
  15. hostname := strings.Split(topic, "/")[1]
  16. msgStr := string(message)
  17. if strings.HasPrefix(msgStr, "[") {
  18. var readings []model.RawReading
  19. err := json.Unmarshal(message, &readings)
  20. if err != nil {
  21. fmt.Println("Error parsing JSON:", err)
  22. return
  23. }
  24. for _, reading := range readings {
  25. if reading.Type == "Gateway" {
  26. continue
  27. }
  28. val, ok := appState.BeaconExists(reading.MAC)
  29. if !ok {
  30. continue
  31. }
  32. adv := model.BeaconAdvertisement{
  33. ID: val,
  34. Hostname: hostname,
  35. MAC: reading.MAC,
  36. RSSI: int64(reading.RSSI),
  37. Data: reading.RawData,
  38. }
  39. encodedMsg, err := json.Marshal(adv)
  40. if err != nil {
  41. fmt.Println("Error in marshaling:", err)
  42. break
  43. }
  44. msg := kafka.Message{
  45. Value: encodedMsg,
  46. }
  47. err = writer.WriteMessages(context.Background(), msg)
  48. if err != nil {
  49. fmt.Println("Error in writing to Kafka:", err)
  50. time.Sleep(1 * time.Second)
  51. break
  52. }
  53. }
  54. }
  55. }
  56. // kafkaWriter interface defines the methods we need from kafka.Writer
  57. type kafkaWriter interface {
  58. WriteMessages(ctx context.Context, msgs ...kafka.Message) error
  59. }
  60. // messagePubHandler is extracted from main.go for testing purposes
  61. var messagePubHandler = func(msg mqtt.Message, writer kafkaWriter, appState *appcontext.AppState) {
  62. mqtthandler(writer, msg.Topic(), msg.Payload(), appState)
  63. }