You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

210 line
5.2 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "log/slog"
  9. "os"
  10. "os/signal"
  11. "strings"
  12. "sync"
  13. "syscall"
  14. "time"
  15. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  16. "github.com/AFASystems/presence/internal/pkg/config"
  17. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  18. "github.com/AFASystems/presence/internal/pkg/model"
  19. mqtt "github.com/eclipse/paho.mqtt.golang"
  20. "github.com/segmentio/kafka-go"
  21. )
  22. var wg sync.WaitGroup
  23. func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *appcontext.AppState) {
  24. hostname := strings.Split(topic, "/")[1]
  25. msgStr := string(message)
  26. if strings.HasPrefix(msgStr, "[") {
  27. var readings []model.RawReading
  28. err := json.Unmarshal(message, &readings)
  29. if err != nil {
  30. log.Printf("Error parsing JSON: %v", err)
  31. return
  32. }
  33. for _, reading := range readings {
  34. if reading.Type == "Gateway" {
  35. continue
  36. }
  37. // fmt.Printf("reading: %+v\n", reading)
  38. if !appState.BeaconExists(reading.MAC) {
  39. continue
  40. }
  41. fmt.Printf("Tracking beacon: %s\n", reading.MAC)
  42. adv := model.BeaconAdvertisement{
  43. Hostname: hostname,
  44. MAC: reading.MAC,
  45. RSSI: int64(reading.RSSI),
  46. Data: reading.RawData,
  47. }
  48. encodedMsg, err := json.Marshal(adv)
  49. if err != nil {
  50. fmt.Println("Error in marshaling: ", err)
  51. break
  52. }
  53. msg := kafka.Message{
  54. Value: encodedMsg,
  55. }
  56. err = writer.WriteMessages(context.Background(), msg)
  57. if err != nil {
  58. fmt.Println("Error in writing to Kafka: ", err)
  59. time.Sleep(1 * time.Second)
  60. break
  61. }
  62. }
  63. }
  64. // } else {
  65. // s := strings.Split(string(message), ",")
  66. // if len(s) < 6 {
  67. // log.Printf("Messaggio CSV non valido: %s", msgStr)
  68. // return
  69. // }
  70. // rawdata := s[4]
  71. // buttonCounter := parseButtonState(rawdata)
  72. // if buttonCounter > 0 {
  73. // adv := model.BeaconAdvertisement{}
  74. // i, _ := strconv.ParseInt(s[3], 10, 64)
  75. // adv.Hostname = hostname
  76. // adv.BeaconType = "hb_button"
  77. // adv.MAC = s[1]
  78. // adv.RSSI = i
  79. // adv.Data = rawdata
  80. // adv.HSButtonCounter = buttonCounter
  81. // read_line := strings.TrimRight(string(s[5]), "\r\n")
  82. // it, err33 := strconv.Atoi(read_line)
  83. // if err33 != nil {
  84. // fmt.Println(it)
  85. // fmt.Println(err33)
  86. // os.Exit(2)
  87. // }
  88. // }
  89. // }
  90. }
  91. var messagePubHandler = func(msg mqtt.Message, writer *kafka.Writer, appState *appcontext.AppState) {
  92. mqtthandler(writer, msg.Topic(), msg.Payload(), appState)
  93. }
  94. var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
  95. fmt.Println("Connected")
  96. }
  97. var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
  98. fmt.Printf("Connect lost: %v", err)
  99. }
  100. func main() {
  101. // Load global context to init beacons and latest list
  102. appState := appcontext.NewAppState()
  103. cfg := config.Load()
  104. // Create log file -> this section and below can be moved in a package, as it is always the same
  105. logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  106. if err != nil {
  107. log.Fatalf("Failed to open log file: %v\n", err)
  108. }
  109. // shell and log file multiwriter
  110. w := io.MultiWriter(os.Stderr, logFile)
  111. logger := slog.New(slog.NewJSONHandler(w, nil))
  112. slog.SetDefault(logger)
  113. // define context
  114. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  115. defer stop()
  116. // define kafka reader
  117. apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "bridge-api")
  118. // define kafka writer
  119. writer := appState.AddKafkaWriter(cfg.KafkaURL, "rawbeacons")
  120. slog.Info("Bridge initialized, subscribed to kafka topics")
  121. chApi := make(chan model.ApiUpdate, 200)
  122. wg.Add(1)
  123. go kafkaclient.Consume(apiReader, chApi, ctx, &wg)
  124. opts := mqtt.NewClientOptions()
  125. opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTHost, 1883))
  126. opts.SetClientID("go_mqtt_client")
  127. opts.SetUsername("emqx")
  128. opts.SetPassword("public")
  129. opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { messagePubHandler(m, writer, appState) })
  130. opts.OnConnect = connectHandler
  131. opts.OnConnectionLost = connectLostHandler
  132. client := mqtt.NewClient(opts)
  133. if token := client.Connect(); token.Wait() && token.Error() != nil {
  134. panic(token.Error())
  135. }
  136. sub(client)
  137. eventloop:
  138. for {
  139. select {
  140. case <-ctx.Done():
  141. break eventloop
  142. case msg := <-chApi:
  143. switch msg.Method {
  144. case "POST":
  145. id := msg.Beacon.ID
  146. appState.AddBeaconToLookup(id)
  147. lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
  148. slog.Info(lMsg)
  149. case "DELETE":
  150. id := msg.Beacon.ID
  151. appState.RemoveBeaconFromLookup(id)
  152. lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
  153. slog.Info(lMsg)
  154. }
  155. }
  156. }
  157. slog.Info("broken out of the main event loop")
  158. wg.Wait()
  159. slog.Info("All go routines have stopped, Beggining to close Kafka connections")
  160. appState.CleanKafkaReaders()
  161. appState.CleanKafkaWriters()
  162. client.Disconnect(250)
  163. slog.Info("Closing connection to MQTT broker")
  164. }
  165. func publish(client mqtt.Client) {
  166. num := 10
  167. for i := 0; i < num; i++ {
  168. text := fmt.Sprintf("Message %d", i)
  169. token := client.Publish("topic/test", 0, false, text)
  170. token.Wait()
  171. time.Sleep(time.Second)
  172. }
  173. }
  174. func sub(client mqtt.Client) {
  175. topic := "publish_out/#"
  176. token := client.Subscribe(topic, 1, nil)
  177. token.Wait()
  178. fmt.Printf("Subscribed to topic: %s\n", topic)
  179. }