No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.
 
 
 
 

224 líneas
5.7 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "log/slog"
  8. "os/signal"
  9. "strings"
  10. "sync"
  11. "syscall"
  12. "time"
  13. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  14. "github.com/AFASystems/presence/internal/pkg/config"
  15. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  16. "github.com/AFASystems/presence/internal/pkg/logger"
  17. "github.com/AFASystems/presence/internal/pkg/model"
  18. mqtt "github.com/eclipse/paho.mqtt.golang"
  19. "github.com/segmentio/kafka-go"
  20. )
  21. var wg sync.WaitGroup
  22. func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *appcontext.AppState) {
  23. hostname := strings.Split(topic, "/")[1]
  24. msgStr := string(message)
  25. if strings.HasPrefix(msgStr, "[") {
  26. var readings []model.RawReading
  27. err := json.Unmarshal(message, &readings)
  28. if err != nil {
  29. log.Printf("Error parsing JSON: %v", err)
  30. return
  31. }
  32. for _, reading := range readings {
  33. if reading.Type == "Gateway" {
  34. continue
  35. }
  36. val, ok := appState.BeaconExists(reading.MAC)
  37. // fmt.Printf("reading: %+v\n", reading)
  38. if !ok {
  39. continue
  40. }
  41. adv := model.BeaconAdvertisement{
  42. ID: val,
  43. Hostname: hostname,
  44. MAC: reading.MAC,
  45. RSSI: int64(reading.RSSI),
  46. Data: reading.RawData,
  47. }
  48. encodedMsg, err := json.Marshal(adv)
  49. if err != nil {
  50. fmt.Println("Error in marshaling: ", err)
  51. break
  52. }
  53. msg := kafka.Message{
  54. Value: encodedMsg,
  55. }
  56. err = writer.WriteMessages(context.Background(), msg)
  57. if err != nil {
  58. fmt.Println("Error in writing to Kafka: ", err)
  59. time.Sleep(1 * time.Second)
  60. break
  61. }
  62. }
  63. }
  64. // } else {
  65. // s := strings.Split(string(message), ",")
  66. // if len(s) < 6 {
  67. // log.Printf("Messaggio CSV non valido: %s", msgStr)
  68. // return
  69. // }
  70. // rawdata := s[4]
  71. // buttonCounter := parseButtonState(rawdata)
  72. // if buttonCounter > 0 {
  73. // adv := model.BeaconAdvertisement{}
  74. // i, _ := strconv.ParseInt(s[3], 10, 64)
  75. // adv.Hostname = hostname
  76. // adv.BeaconType = "hb_button"
  77. // adv.MAC = s[1]
  78. // adv.RSSI = i
  79. // adv.Data = rawdata
  80. // adv.HSButtonCounter = buttonCounter
  81. // read_line := strings.TrimRight(string(s[5]), "\r\n")
  82. // it, err33 := strconv.Atoi(read_line)
  83. // if err33 != nil {
  84. // fmt.Println(it)
  85. // fmt.Println(err33)
  86. // os.Exit(2)
  87. // }
  88. // }
  89. // }
  90. }
  91. var messagePubHandler = func(msg mqtt.Message, writer *kafka.Writer, appState *appcontext.AppState) {
  92. mqtthandler(writer, msg.Topic(), msg.Payload(), appState)
  93. }
  94. var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
  95. fmt.Println("Connected")
  96. }
  97. var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
  98. fmt.Printf("Connect lost: %v", err)
  99. }
  100. func main() {
  101. // Load global context to init beacons and latest list
  102. appState := appcontext.NewAppState()
  103. cfg := config.Load()
  104. // Set logger -> terminal and log file
  105. slog.SetDefault(logger.CreateLogger("bridge.log"))
  106. // define context
  107. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  108. defer stop()
  109. // define kafka readers
  110. apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "bridge-api")
  111. alertReader := appState.AddKafkaReader(cfg.KafkaURL, "alert", "bridge-alert")
  112. mqttReader := appState.AddKafkaReader(cfg.KafkaURL, "mqtt", "bridge-mqtt")
  113. // define kafka writer
  114. writer := appState.AddKafkaWriter(cfg.KafkaURL, "rawbeacons")
  115. slog.Info("Bridge initialized, subscribed to kafka topics")
  116. chApi := make(chan model.ApiUpdate, 200)
  117. chAlert := make(chan model.Alert, 200)
  118. chMqtt := make(chan []model.Tracker, 200)
  119. wg.Add(3)
  120. go kafkaclient.Consume(apiReader, chApi, ctx, &wg)
  121. go kafkaclient.Consume(alertReader, chAlert, ctx, &wg)
  122. go kafkaclient.Consume(mqttReader, chMqtt, ctx, &wg)
  123. opts := mqtt.NewClientOptions()
  124. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTHost, 1883))
  125. opts.SetClientID("go_mqtt_client")
  126. opts.SetAutoReconnect(true)
  127. opts.SetConnectRetry(true)
  128. opts.SetConnectRetryInterval(1 * time.Second)
  129. opts.SetMaxReconnectInterval(600 * time.Second)
  130. opts.SetCleanSession(false)
  131. opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { messagePubHandler(m, writer, appState) })
  132. opts.OnConnect = connectHandler
  133. opts.OnConnectionLost = connectLostHandler
  134. client := mqtt.NewClient(opts)
  135. if token := client.Connect(); token.Wait() && token.Error() != nil {
  136. panic(token.Error())
  137. }
  138. sub(client)
  139. eventloop:
  140. for {
  141. select {
  142. case <-ctx.Done():
  143. break eventloop
  144. case msg := <-chApi:
  145. switch msg.Method {
  146. case "POST":
  147. id := msg.ID
  148. appState.AddBeaconToLookup(msg.MAC, id)
  149. lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
  150. slog.Info(lMsg)
  151. case "DELETE":
  152. id := msg.MAC
  153. if id == "all" {
  154. appState.CleanLookup()
  155. fmt.Println("cleaned up lookup map")
  156. continue
  157. }
  158. appState.RemoveBeaconFromLookup(id)
  159. lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
  160. slog.Info(lMsg)
  161. }
  162. case msg := <-chAlert:
  163. fmt.Printf("Alerts: %+v\n", msg)
  164. p, err := json.Marshal(msg)
  165. if err != nil {
  166. continue
  167. }
  168. client.Publish("/alerts", 0, true, p)
  169. case msg := <-chMqtt:
  170. fmt.Printf("trackers: %+v\n", msg)
  171. p, err := json.Marshal(msg)
  172. if err != nil {
  173. continue
  174. }
  175. client.Publish("/trackers", 0, true, p)
  176. }
  177. }
  178. slog.Info("broken out of the main event loop")
  179. wg.Wait()
  180. slog.Info("All go routines have stopped, Beggining to close Kafka connections")
  181. appState.CleanKafkaReaders()
  182. appState.CleanKafkaWriters()
  183. client.Disconnect(250)
  184. slog.Info("Closing connection to MQTT broker")
  185. }
  186. func sub(client mqtt.Client) {
  187. topic := "publish_out/#"
  188. token := client.Subscribe(topic, 1, nil)
  189. token.Wait()
  190. fmt.Printf("Subscribed to topic: %s\n", topic)
  191. }