Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.
 
 
 
 

232 lignes
5.9 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "log/slog"
  9. "os"
  10. "os/signal"
  11. "strings"
  12. "sync"
  13. "syscall"
  14. "time"
  15. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  16. "github.com/AFASystems/presence/internal/pkg/config"
  17. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  18. "github.com/AFASystems/presence/internal/pkg/model"
  19. mqtt "github.com/eclipse/paho.mqtt.golang"
  20. "github.com/segmentio/kafka-go"
  21. )
  22. var wg sync.WaitGroup
  23. func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *appcontext.AppState) {
  24. hostname := strings.Split(topic, "/")[1]
  25. msgStr := string(message)
  26. if strings.HasPrefix(msgStr, "[") {
  27. var readings []model.RawReading
  28. err := json.Unmarshal(message, &readings)
  29. if err != nil {
  30. log.Printf("Error parsing JSON: %v", err)
  31. return
  32. }
  33. for _, reading := range readings {
  34. if reading.Type == "Gateway" {
  35. continue
  36. }
  37. val, ok := appState.BeaconExists(reading.MAC)
  38. // fmt.Printf("reading: %+v\n", reading)
  39. if !ok {
  40. continue
  41. }
  42. adv := model.BeaconAdvertisement{
  43. ID: val,
  44. Hostname: hostname,
  45. MAC: reading.MAC,
  46. RSSI: int64(reading.RSSI),
  47. Data: reading.RawData,
  48. }
  49. encodedMsg, err := json.Marshal(adv)
  50. if err != nil {
  51. fmt.Println("Error in marshaling: ", err)
  52. break
  53. }
  54. msg := kafka.Message{
  55. Value: encodedMsg,
  56. }
  57. err = writer.WriteMessages(context.Background(), msg)
  58. if err != nil {
  59. fmt.Println("Error in writing to Kafka: ", err)
  60. time.Sleep(1 * time.Second)
  61. break
  62. }
  63. }
  64. }
  65. // } else {
  66. // s := strings.Split(string(message), ",")
  67. // if len(s) < 6 {
  68. // log.Printf("Messaggio CSV non valido: %s", msgStr)
  69. // return
  70. // }
  71. // rawdata := s[4]
  72. // buttonCounter := parseButtonState(rawdata)
  73. // if buttonCounter > 0 {
  74. // adv := model.BeaconAdvertisement{}
  75. // i, _ := strconv.ParseInt(s[3], 10, 64)
  76. // adv.Hostname = hostname
  77. // adv.BeaconType = "hb_button"
  78. // adv.MAC = s[1]
  79. // adv.RSSI = i
  80. // adv.Data = rawdata
  81. // adv.HSButtonCounter = buttonCounter
  82. // read_line := strings.TrimRight(string(s[5]), "\r\n")
  83. // it, err33 := strconv.Atoi(read_line)
  84. // if err33 != nil {
  85. // fmt.Println(it)
  86. // fmt.Println(err33)
  87. // os.Exit(2)
  88. // }
  89. // }
  90. // }
  91. }
  92. var messagePubHandler = func(msg mqtt.Message, writer *kafka.Writer, appState *appcontext.AppState) {
  93. mqtthandler(writer, msg.Topic(), msg.Payload(), appState)
  94. }
  95. var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
  96. fmt.Println("Connected")
  97. }
  98. var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
  99. fmt.Printf("Connect lost: %v", err)
  100. }
  101. func main() {
  102. // Load global context to init beacons and latest list
  103. appState := appcontext.NewAppState()
  104. cfg := config.Load()
  105. // Create log file -> this section and below can be moved in a package, as it is always the same
  106. logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  107. if err != nil {
  108. log.Fatalf("Failed to open log file: %v\n", err)
  109. }
  110. // shell and log file multiwriter
  111. w := io.MultiWriter(os.Stderr, logFile)
  112. logger := slog.New(slog.NewJSONHandler(w, nil))
  113. slog.SetDefault(logger)
  114. // define context
  115. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  116. defer stop()
  117. // define kafka readers
  118. apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "bridge-api")
  119. alertReader := appState.AddKafkaReader(cfg.KafkaURL, "alert", "bridge-alert")
  120. mqttReader := appState.AddKafkaReader(cfg.KafkaURL, "mqtt", "bridge-mqtt")
  121. // define kafka writer
  122. writer := appState.AddKafkaWriter(cfg.KafkaURL, "rawbeacons")
  123. slog.Info("Bridge initialized, subscribed to kafka topics")
  124. chApi := make(chan model.ApiUpdate, 200)
  125. chAlert := make(chan model.Alert, 200)
  126. chMqtt := make(chan []model.Tracker, 200)
  127. wg.Add(3)
  128. go kafkaclient.Consume(apiReader, chApi, ctx, &wg)
  129. go kafkaclient.Consume(alertReader, chAlert, ctx, &wg)
  130. go kafkaclient.Consume(mqttReader, chMqtt, ctx, &wg)
  131. opts := mqtt.NewClientOptions()
  132. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTHost, 1883))
  133. opts.SetClientID("go_mqtt_client")
  134. opts.SetAutoReconnect(true)
  135. opts.SetConnectRetry(true)
  136. opts.SetConnectRetryInterval(1 * time.Second)
  137. opts.SetMaxReconnectInterval(600 * time.Second)
  138. opts.SetCleanSession(false)
  139. opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { messagePubHandler(m, writer, appState) })
  140. opts.OnConnect = connectHandler
  141. opts.OnConnectionLost = connectLostHandler
  142. client := mqtt.NewClient(opts)
  143. if token := client.Connect(); token.Wait() && token.Error() != nil {
  144. panic(token.Error())
  145. }
  146. sub(client)
  147. eventloop:
  148. for {
  149. select {
  150. case <-ctx.Done():
  151. break eventloop
  152. case msg := <-chApi:
  153. switch msg.Method {
  154. case "POST":
  155. id := msg.ID
  156. appState.AddBeaconToLookup(msg.MAC, id)
  157. lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
  158. slog.Info(lMsg)
  159. case "DELETE":
  160. id := msg.MAC
  161. if id == "all" {
  162. appState.CleanLookup()
  163. fmt.Println("cleaned up lookup map")
  164. continue
  165. }
  166. appState.RemoveBeaconFromLookup(id)
  167. lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
  168. slog.Info(lMsg)
  169. }
  170. case msg := <-chAlert:
  171. fmt.Printf("Alerts: %+v\n", msg)
  172. p, err := json.Marshal(msg)
  173. if err != nil {
  174. continue
  175. }
  176. client.Publish("/alerts", 0, true, p)
  177. case msg := <-chMqtt:
  178. fmt.Printf("trackers: %+v\n", msg)
  179. p, err := json.Marshal(msg)
  180. if err != nil {
  181. continue
  182. }
  183. client.Publish("/trackers", 0, true, p)
  184. }
  185. }
  186. slog.Info("broken out of the main event loop")
  187. wg.Wait()
  188. slog.Info("All go routines have stopped, Beggining to close Kafka connections")
  189. appState.CleanKafkaReaders()
  190. appState.CleanKafkaWriters()
  191. client.Disconnect(250)
  192. slog.Info("Closing connection to MQTT broker")
  193. }
  194. func sub(client mqtt.Client) {
  195. topic := "publish_out/#"
  196. token := client.Subscribe(topic, 1, nil)
  197. token.Wait()
  198. fmt.Printf("Subscribed to topic: %s\n", topic)
  199. }