Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.
 
 
 
 

145 wiersze
4.2 KiB

  1. package bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "log/slog"
  6. "sync"
  7. "time"
  8. "github.com/AFASystems/presence/internal/pkg/bridge"
  9. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  10. "github.com/AFASystems/presence/internal/pkg/config"
  11. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  12. "github.com/AFASystems/presence/internal/pkg/logger"
  13. "github.com/AFASystems/presence/internal/pkg/model"
  14. mqtt "github.com/eclipse/paho.mqtt.golang"
  15. "github.com/segmentio/kafka-go"
  16. )
  17. // BridgeApp holds dependencies for the bridge service (MQTT <-> Kafka).
  18. type BridgeApp struct {
  19. Cfg *config.Config
  20. KafkaManager *kafkaclient.KafkaManager
  21. AppState *appcontext.AppState
  22. MQTT *bridge.MQTTClient
  23. ChApi chan model.ApiUpdate
  24. ChAlert chan model.Alert
  25. ChMqtt chan []model.Tracker
  26. Cleanup func()
  27. wg sync.WaitGroup
  28. }
  29. // New creates a BridgeApp with Kafka readers (apibeacons, alert, mqtt), writer (rawbeacons), and MQTT client.
  30. func New(cfg *config.Config, ctx context.Context) (*BridgeApp, error) {
  31. appState := appcontext.NewAppState()
  32. kafkaManager := kafkaclient.InitKafkaManager()
  33. srvLogger, cleanup := logger.CreateLogger("bridge.log")
  34. slog.SetDefault(srvLogger)
  35. readerTopics := []string{"apibeacons", "alert", "mqtt"}
  36. writerTopics := []string{"rawbeacons", "healthbridge"}
  37. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics)
  38. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
  39. slog.Info("bridge service initialized", "readers", readerTopics, "writers", writerTopics)
  40. writer := kafkaManager.GetWriter("rawbeacons")
  41. mqttClient, err := bridge.NewMQTTClient(cfg, func(m mqtt.Message) {
  42. bridge.HandleMQTTMessage(m.Topic(), m.Payload(), appState, writer, ctx)
  43. })
  44. if err != nil {
  45. cleanup()
  46. return nil, err
  47. }
  48. if err := mqttClient.Subscribe(); err != nil {
  49. cleanup()
  50. return nil, err
  51. }
  52. return &BridgeApp{
  53. Cfg: cfg,
  54. KafkaManager: kafkaManager,
  55. AppState: appState,
  56. MQTT: mqttClient,
  57. ChApi: make(chan model.ApiUpdate, config.SMALL_CHANNEL_SIZE),
  58. ChAlert: make(chan model.Alert, config.SMALL_CHANNEL_SIZE),
  59. ChMqtt: make(chan []model.Tracker, config.SMALL_CHANNEL_SIZE),
  60. Cleanup: cleanup,
  61. }, nil
  62. }
  63. // Run starts Kafka consumers and the event loop until ctx is cancelled.
  64. func (a *BridgeApp) Run(ctx context.Context) {
  65. a.wg.Add(3)
  66. go kafkaclient.Consume(a.KafkaManager.GetReader("apibeacons"), a.ChApi, ctx, &a.wg)
  67. go kafkaclient.Consume(a.KafkaManager.GetReader("alert"), a.ChAlert, ctx, &a.wg)
  68. go kafkaclient.Consume(a.KafkaManager.GetReader("mqtt"), a.ChMqtt, ctx, &a.wg)
  69. healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL)
  70. defer healthTicker.Stop()
  71. for {
  72. select {
  73. case <-ctx.Done():
  74. return
  75. case <-healthTicker.C:
  76. health, err := a.AppState.GetBridgeHealth(a.KafkaManager)
  77. if err != nil {
  78. slog.Error("getting bridge health", "err", err)
  79. continue
  80. }
  81. m := kafka.Message{
  82. Value: health,
  83. }
  84. if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthbridge"), m); err != nil {
  85. slog.Error("writing bridge health", "err", err)
  86. continue
  87. }
  88. case msg := <-a.ChApi:
  89. switch msg.Method {
  90. case "POST":
  91. a.AppState.AddBeaconToLookup(msg.MAC, msg.ID)
  92. slog.Info("beacon added to lookup", "id", msg.ID)
  93. case "DELETE":
  94. if msg.MAC == "all" {
  95. a.AppState.CleanLookup()
  96. slog.Info("lookup cleared")
  97. continue
  98. }
  99. a.AppState.RemoveBeaconFromLookup(msg.MAC)
  100. slog.Info("beacon removed from lookup", "mac", msg.MAC)
  101. }
  102. case msg := <-a.ChAlert:
  103. p, err := json.Marshal(msg)
  104. if err != nil {
  105. slog.Error("marshaling alert", "err", err)
  106. continue
  107. }
  108. a.MQTT.Client.Publish("/alerts", 0, true, p)
  109. case msg := <-a.ChMqtt:
  110. p, err := json.Marshal(msg)
  111. if err != nil {
  112. slog.Error("marshaling trackers", "err", err)
  113. continue
  114. }
  115. a.MQTT.Client.Publish("/trackers", 0, true, p)
  116. }
  117. }
  118. }
  119. // Shutdown disconnects MQTT, waits for consumers, and cleans up.
  120. func (a *BridgeApp) Shutdown() {
  121. a.wg.Wait()
  122. if a.MQTT != nil {
  123. a.MQTT.Disconnect()
  124. }
  125. a.KafkaManager.CleanKafkaReaders()
  126. a.KafkaManager.CleanKafkaWriters()
  127. if a.Cleanup != nil {
  128. a.Cleanup()
  129. }
  130. slog.Info("bridge service shutdown complete")
  131. }