No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.
 
 
 
 

143 líneas
4.2 KiB

  1. package bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log/slog"
  7. "sync"
  8. "time"
  9. "github.com/AFASystems/presence/internal/pkg/bridge"
  10. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  11. "github.com/AFASystems/presence/internal/pkg/config"
  12. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  13. "github.com/AFASystems/presence/internal/pkg/logger"
  14. "github.com/AFASystems/presence/internal/pkg/model"
  15. mqtt "github.com/eclipse/paho.mqtt.golang"
  16. "github.com/segmentio/kafka-go"
  17. )
  18. // BridgeApp holds dependencies for the bridge service (MQTT <-> Kafka).
  19. type BridgeApp struct {
  20. Cfg *config.Config
  21. KafkaManager *kafkaclient.KafkaManager
  22. AppState *appcontext.AppState
  23. MQTT *bridge.MQTTClient
  24. ChApi chan model.ApiUpdate
  25. ChAlert chan model.Alert
  26. ChMqtt chan []model.Tracker
  27. Cleanup func()
  28. wg sync.WaitGroup
  29. }
  30. // New creates a BridgeApp with Kafka readers (apibeacons, alert, mqtt), writer (rawbeacons), and MQTT client.
  31. func New(cfg *config.Config) (*BridgeApp, error) {
  32. appState := appcontext.NewAppState()
  33. kafkaManager := kafkaclient.InitKafkaManager()
  34. srvLogger, cleanup := logger.CreateLogger("bridge.log")
  35. slog.SetDefault(srvLogger)
  36. readerTopics := []string{"apibeacons", "alert", "mqtt"}
  37. writerTopics := []string{"rawbeacons", "healthbridge"}
  38. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics)
  39. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
  40. slog.Info("bridge service initialized", "readers", readerTopics, "writers", writerTopics)
  41. writer := kafkaManager.GetWriter("rawbeacons")
  42. mqttClient, err := bridge.NewMQTTClient(cfg, func(m mqtt.Message) {
  43. bridge.HandleMQTTMessage(m.Topic(), m.Payload(), appState, writer)
  44. })
  45. if err != nil {
  46. cleanup()
  47. return nil, err
  48. }
  49. mqttClient.Subscribe()
  50. return &BridgeApp{
  51. Cfg: cfg,
  52. KafkaManager: kafkaManager,
  53. AppState: appState,
  54. MQTT: mqttClient,
  55. ChApi: make(chan model.ApiUpdate, config.SMALL_CHANNEL_SIZE),
  56. ChAlert: make(chan model.Alert, config.SMALL_CHANNEL_SIZE),
  57. ChMqtt: make(chan []model.Tracker, config.SMALL_CHANNEL_SIZE),
  58. Cleanup: cleanup,
  59. }, nil
  60. }
  61. // Run starts Kafka consumers and the event loop until ctx is cancelled.
  62. func (a *BridgeApp) Run(ctx context.Context) {
  63. a.wg.Add(3)
  64. go kafkaclient.Consume(a.KafkaManager.GetReader("apibeacons"), a.ChApi, ctx, &a.wg)
  65. go kafkaclient.Consume(a.KafkaManager.GetReader("alert"), a.ChAlert, ctx, &a.wg)
  66. go kafkaclient.Consume(a.KafkaManager.GetReader("mqtt"), a.ChMqtt, ctx, &a.wg)
  67. healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL)
  68. defer healthTicker.Stop()
  69. for {
  70. select {
  71. case <-ctx.Done():
  72. return
  73. case <-healthTicker.C:
  74. health, err := a.AppState.GetBridgeHealth(a.KafkaManager)
  75. if err != nil {
  76. slog.Error("getting bridge health", "err", err)
  77. continue
  78. }
  79. m := kafka.Message{
  80. Value: health,
  81. }
  82. if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthbridge"), m); err != nil {
  83. slog.Error("writing bridge health", "err", err)
  84. continue
  85. }
  86. case msg := <-a.ChApi:
  87. switch msg.Method {
  88. case "POST":
  89. fmt.Println("adding beacon to lookup", "mac", msg.MAC, "id", msg.ID)
  90. a.AppState.AddBeaconToLookup(msg.MAC, msg.ID)
  91. slog.Info("beacon added to lookup", "id", msg.ID)
  92. case "DELETE":
  93. if msg.MAC == "all" {
  94. a.AppState.CleanLookup()
  95. slog.Info("lookup cleared")
  96. continue
  97. }
  98. a.AppState.RemoveBeaconFromLookup(msg.MAC)
  99. slog.Info("beacon removed from lookup", "mac", msg.MAC)
  100. }
  101. case msg := <-a.ChAlert:
  102. p, err := json.Marshal(msg)
  103. if err != nil {
  104. slog.Error("marshaling alert", "err", err)
  105. continue
  106. }
  107. a.MQTT.Client.Publish("/alerts", 0, true, p)
  108. case msg := <-a.ChMqtt:
  109. p, err := json.Marshal(msg)
  110. if err != nil {
  111. slog.Error("marshaling trackers", "err", err)
  112. continue
  113. }
  114. a.MQTT.Client.Publish("/trackers", 0, true, p)
  115. }
  116. }
  117. }
  118. // Shutdown disconnects MQTT, waits for consumers, and cleans up.
  119. func (a *BridgeApp) Shutdown() {
  120. a.wg.Wait()
  121. if a.MQTT != nil {
  122. a.MQTT.Disconnect()
  123. }
  124. a.KafkaManager.CleanKafkaReaders()
  125. a.KafkaManager.CleanKafkaWriters()
  126. if a.Cleanup != nil {
  127. a.Cleanup()
  128. }
  129. slog.Info("bridge service shutdown complete")
  130. }