Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.
 
 
 
 

141 рядки
4.1 KiB

  1. package bridge
  2. import (
  3. "context"
  4. "encoding/json"
  5. "log/slog"
  6. "sync"
  7. "time"
  8. "github.com/AFASystems/presence/internal/pkg/bridge"
  9. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  10. "github.com/AFASystems/presence/internal/pkg/config"
  11. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  12. "github.com/AFASystems/presence/internal/pkg/logger"
  13. "github.com/AFASystems/presence/internal/pkg/model"
  14. mqtt "github.com/eclipse/paho.mqtt.golang"
  15. "github.com/segmentio/kafka-go"
  16. )
  17. // BridgeApp holds dependencies for the bridge service (MQTT <-> Kafka).
  18. type BridgeApp struct {
  19. Cfg *config.Config
  20. KafkaManager *kafkaclient.KafkaManager
  21. AppState *appcontext.AppState
  22. MQTT *bridge.MQTTClient
  23. ChApi chan model.ApiUpdate
  24. ChAlert chan model.Alert
  25. ChMqtt chan []model.Tracker
  26. Cleanup func()
  27. wg sync.WaitGroup
  28. }
  29. // New creates a BridgeApp with Kafka readers (apibeacons, alert, mqtt), writer (rawbeacons), and MQTT client.
  30. func New(cfg *config.Config) (*BridgeApp, error) {
  31. appState := appcontext.NewAppState()
  32. kafkaManager := kafkaclient.InitKafkaManager()
  33. srvLogger, cleanup := logger.CreateLogger("bridge.log")
  34. slog.SetDefault(srvLogger)
  35. readerTopics := []string{"apibeacons", "alert", "mqtt"}
  36. writerTopics := []string{"rawbeacons", "healthbridge"}
  37. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics)
  38. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
  39. slog.Info("bridge service initialized", "readers", readerTopics, "writers", writerTopics)
  40. writer := kafkaManager.GetWriter("rawbeacons")
  41. mqttClient, err := bridge.NewMQTTClient(cfg, func(m mqtt.Message) {
  42. bridge.HandleMQTTMessage(m.Topic(), m.Payload(), appState, writer)
  43. })
  44. if err != nil {
  45. cleanup()
  46. return nil, err
  47. }
  48. mqttClient.Subscribe()
  49. return &BridgeApp{
  50. Cfg: cfg,
  51. KafkaManager: kafkaManager,
  52. AppState: appState,
  53. MQTT: mqttClient,
  54. ChApi: make(chan model.ApiUpdate, config.SMALL_CHANNEL_SIZE),
  55. ChAlert: make(chan model.Alert, config.SMALL_CHANNEL_SIZE),
  56. ChMqtt: make(chan []model.Tracker, config.SMALL_CHANNEL_SIZE),
  57. Cleanup: cleanup,
  58. }, nil
  59. }
  60. // Run starts Kafka consumers and the event loop until ctx is cancelled.
  61. func (a *BridgeApp) Run(ctx context.Context) {
  62. a.wg.Add(3)
  63. go kafkaclient.Consume(a.KafkaManager.GetReader("apibeacons"), a.ChApi, ctx, &a.wg)
  64. go kafkaclient.Consume(a.KafkaManager.GetReader("alert"), a.ChAlert, ctx, &a.wg)
  65. go kafkaclient.Consume(a.KafkaManager.GetReader("mqtt"), a.ChMqtt, ctx, &a.wg)
  66. healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL)
  67. defer healthTicker.Stop()
  68. for {
  69. select {
  70. case <-ctx.Done():
  71. return
  72. case <-healthTicker.C:
  73. health, err := a.AppState.GetBridgeHealth(a.KafkaManager)
  74. if err != nil {
  75. slog.Error("getting bridge health", "err", err)
  76. continue
  77. }
  78. m := kafka.Message{
  79. Value: health,
  80. }
  81. if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthbridge"), m); err != nil {
  82. slog.Error("writing bridge health", "err", err)
  83. continue
  84. }
  85. case msg := <-a.ChApi:
  86. switch msg.Method {
  87. case "POST":
  88. a.AppState.AddBeaconToLookup(msg.MAC, msg.ID)
  89. slog.Info("beacon added to lookup", "id", msg.ID)
  90. case "DELETE":
  91. if msg.MAC == "all" {
  92. a.AppState.CleanLookup()
  93. slog.Info("lookup cleared")
  94. continue
  95. }
  96. a.AppState.RemoveBeaconFromLookup(msg.MAC)
  97. slog.Info("beacon removed from lookup", "mac", msg.MAC)
  98. }
  99. case msg := <-a.ChAlert:
  100. p, err := json.Marshal(msg)
  101. if err != nil {
  102. slog.Error("marshaling alert", "err", err)
  103. continue
  104. }
  105. a.MQTT.Client.Publish("/alerts", 0, true, p)
  106. case msg := <-a.ChMqtt:
  107. p, err := json.Marshal(msg)
  108. if err != nil {
  109. slog.Error("marshaling trackers", "err", err)
  110. continue
  111. }
  112. a.MQTT.Client.Publish("/trackers", 0, true, p)
  113. }
  114. }
  115. }
  116. // Shutdown disconnects MQTT, waits for consumers, and cleans up.
  117. func (a *BridgeApp) Shutdown() {
  118. a.wg.Wait()
  119. if a.MQTT != nil {
  120. a.MQTT.Disconnect()
  121. }
  122. a.KafkaManager.CleanKafkaReaders()
  123. a.KafkaManager.CleanKafkaWriters()
  124. if a.Cleanup != nil {
  125. a.Cleanup()
  126. }
  127. slog.Info("bridge service shutdown complete")
  128. }