Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 
 

207 строки
5.2 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "log/slog"
  8. "os/signal"
  9. "strings"
  10. "sync"
  11. "syscall"
  12. "time"
  13. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  14. "github.com/AFASystems/presence/internal/pkg/config"
  15. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  16. "github.com/AFASystems/presence/internal/pkg/logger"
  17. "github.com/AFASystems/presence/internal/pkg/model"
  18. mqtt "github.com/eclipse/paho.mqtt.golang"
  19. "github.com/google/uuid"
  20. "github.com/segmentio/kafka-go"
  21. )
  22. var wg sync.WaitGroup
  23. func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *appcontext.AppState) {
  24. hostname := strings.Split(topic, "/")[1]
  25. msgStr := string(message)
  26. if strings.HasPrefix(msgStr, "[") {
  27. var readings []model.RawReading
  28. err := json.Unmarshal(message, &readings)
  29. if err != nil {
  30. log.Printf("Error parsing JSON: %v", err)
  31. return
  32. }
  33. for _, reading := range readings {
  34. if reading.Type == "Gateway" {
  35. continue
  36. }
  37. val, ok := appState.BeaconExists(reading.MAC)
  38. // fmt.Printf("reading: %+v\n", reading)
  39. if !ok {
  40. continue
  41. }
  42. adv := model.BeaconAdvertisement{
  43. ID: val,
  44. Hostname: hostname,
  45. MAC: reading.MAC,
  46. RSSI: int64(reading.RSSI),
  47. Data: reading.RawData,
  48. }
  49. encodedMsg, err := json.Marshal(adv)
  50. if err != nil {
  51. fmt.Println("Error in marshaling: ", err)
  52. break
  53. }
  54. msg := kafka.Message{
  55. Value: encodedMsg,
  56. }
  57. err = writer.WriteMessages(context.Background(), msg)
  58. if err != nil {
  59. fmt.Println("Error in writing to Kafka: ", err)
  60. time.Sleep(1 * time.Second)
  61. break
  62. }
  63. }
  64. } else {
  65. s := strings.Split(string(message), ",")
  66. if len(s) < 6 {
  67. log.Printf("Messaggio CSV non valido: %s", msgStr)
  68. return
  69. }
  70. fmt.Println("this gateway is also sending data: ", s)
  71. }
  72. }
  73. var messagePubHandler = func(msg mqtt.Message, writer *kafka.Writer, appState *appcontext.AppState) {
  74. mqtthandler(writer, msg.Topic(), msg.Payload(), appState)
  75. }
  76. var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
  77. fmt.Println("Connected")
  78. }
  79. var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
  80. fmt.Printf("Connect lost: %v", err)
  81. }
  82. func main() {
  83. // Load global context to init beacons and latest list
  84. appState := appcontext.NewAppState()
  85. cfg := config.Load()
  86. kafkaManager := kafkaclient.InitKafkaManager()
  87. // Set logger -> terminal and log file
  88. slog.SetDefault(logger.CreateLogger("bridge.log"))
  89. // define context
  90. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  91. defer stop()
  92. readerTopics := []string{"apibeacons", "alert", "mqtt"}
  93. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics)
  94. writerTopics := []string{"rawbeacons"}
  95. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
  96. slog.Info("Bridge initialized, subscribed to kafka topics")
  97. chApi := make(chan model.ApiUpdate, 200)
  98. chAlert := make(chan model.Alert, 200)
  99. chMqtt := make(chan []model.Tracker, 200)
  100. wg.Add(3)
  101. go kafkaclient.Consume(kafkaManager.GetReader("apibeacons"), chApi, ctx, &wg)
  102. go kafkaclient.Consume(kafkaManager.GetReader("alert"), chAlert, ctx, &wg)
  103. go kafkaclient.Consume(kafkaManager.GetReader("mqtt"), chMqtt, ctx, &wg)
  104. opts := mqtt.NewClientOptions()
  105. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTHost, 1883))
  106. cId := fmt.Sprintf("bridge-%s", uuid.New().String())
  107. opts.SetClientID(cId)
  108. opts.SetAutoReconnect(true)
  109. opts.SetConnectRetry(true)
  110. opts.SetConnectRetryInterval(1 * time.Second)
  111. opts.SetMaxReconnectInterval(600 * time.Second)
  112. opts.SetCleanSession(false)
  113. opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
  114. messagePubHandler(m, kafkaManager.GetWriter("rawbeacons"), appState)
  115. })
  116. opts.OnConnect = connectHandler
  117. opts.OnConnectionLost = connectLostHandler
  118. client := mqtt.NewClient(opts)
  119. if token := client.Connect(); token.Wait() && token.Error() != nil {
  120. panic(token.Error())
  121. }
  122. sub(client)
  123. eventloop:
  124. for {
  125. select {
  126. case <-ctx.Done():
  127. break eventloop
  128. case msg := <-chApi:
  129. switch msg.Method {
  130. case "POST":
  131. id := msg.ID
  132. appState.AddBeaconToLookup(msg.MAC, id)
  133. lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
  134. slog.Info(lMsg)
  135. case "DELETE":
  136. id := msg.MAC
  137. if id == "all" {
  138. appState.CleanLookup()
  139. fmt.Println("cleaned up lookup map")
  140. continue
  141. }
  142. appState.RemoveBeaconFromLookup(id)
  143. lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
  144. slog.Info(lMsg)
  145. }
  146. case msg := <-chAlert:
  147. p, err := json.Marshal(msg)
  148. if err != nil {
  149. continue
  150. }
  151. client.Publish("/alerts", 0, true, p)
  152. case msg := <-chMqtt:
  153. p, err := json.Marshal(msg)
  154. if err != nil {
  155. continue
  156. }
  157. client.Publish("/trackers", 0, true, p)
  158. }
  159. }
  160. slog.Info("broken out of the main event loop")
  161. wg.Wait()
  162. slog.Info("All go routines have stopped, Beggining to close Kafka connections")
  163. kafkaManager.CleanKafkaReaders()
  164. kafkaManager.CleanKafkaWriters()
  165. client.Disconnect(250)
  166. slog.Info("Closing connection to MQTT broker")
  167. }
  168. func sub(client mqtt.Client) {
  169. topic := "publish_out/#"
  170. token := client.Subscribe(topic, 1, nil)
  171. token.Wait()
  172. fmt.Printf("Subscribed to topic: %s\n", topic)
  173. }