You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

187 lines
4.6 KiB

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/AFASystems/presence/internal/pkg/config"
  12. "github.com/AFASystems/presence/internal/pkg/httpserver"
  13. "github.com/AFASystems/presence/internal/pkg/model"
  14. "github.com/AFASystems/presence/internal/pkg/mqtt_client"
  15. "github.com/AFASystems/presence/internal/pkg/persistence"
  16. "github.com/boltdb/bolt"
  17. "github.com/gorilla/websocket"
  18. "github.com/yosssi/gmq/mqtt"
  19. "github.com/yosssi/gmq/mqtt/client"
  20. )
  21. func main() {
  22. sigc := make(chan os.Signal, 1)
  23. signal.Notify(sigc, os.Interrupt)
  24. cfg := config.Load()
  25. db, err := bolt.Open("presence.db", 0644, nil)
  26. if err != nil {
  27. log.Fatal(err)
  28. }
  29. defer db.Close()
  30. model.Db = db
  31. cli := client.New(&client.Options{
  32. ErrorHandler: func(err error) {
  33. fmt.Println(err)
  34. },
  35. })
  36. defer cli.Terminate()
  37. fmt.Println("host: ", cfg.MQTTHost, " Client ID: ", cfg.MQTTClientID, "user: ", cfg.MQTTUser)
  38. err = cli.Connect(&client.ConnectOptions{
  39. Network: "tcp",
  40. Address: cfg.MQTTHost,
  41. ClientID: []byte(cfg.MQTTClientID),
  42. UserName: []byte(cfg.MQTTUser),
  43. Password: []byte(cfg.MQTTPass),
  44. })
  45. if err != nil {
  46. fmt.Println("Error comes from here")
  47. panic(err)
  48. }
  49. ctx := &model.AppContext{
  50. HTTPResults: model.HTTPResultsList{
  51. HTTPResults: model.HTTPLocationsList{Beacons: []model.HTTPLocation{}},
  52. },
  53. Beacons: model.BeaconsList{
  54. Beacons: make(map[string]model.Beacon),
  55. },
  56. ButtonsList: make(map[string]model.Button),
  57. Settings: model.Settings{
  58. Location_confidence: 4,
  59. Last_seen_threshold: 15,
  60. Beacon_metrics_size: 30,
  61. HA_send_interval: 5,
  62. HA_send_changes_only: false,
  63. },
  64. Clients: make(map[*websocket.Conn]bool),
  65. Broadcast: make(chan model.Message, 100),
  66. Locations: model.LocationsList{Locations: make(map[string]model.Location)},
  67. LatestList: model.LatestBeaconsList{LatestList: make(map[string]model.Beacon)},
  68. }
  69. persistence.LoadState(model.Db, ctx)
  70. incomingChan := mqtt_client.IncomingMQTTProcessor(1*time.Second, cli, model.Db, ctx)
  71. err = cli.Subscribe(&client.SubscribeOptions{
  72. SubReqs: []*client.SubReq{
  73. &client.SubReq{
  74. TopicFilter: []byte("publish_out/#"),
  75. QoS: mqtt.QoS0,
  76. Handler: func(topicName, message []byte) {
  77. msgStr := string(message)
  78. t := strings.Split(string(topicName), "/")
  79. hostname := t[1]
  80. fmt.Println("hostname: ", hostname)
  81. if strings.HasPrefix(msgStr, "[") {
  82. var readings []model.RawReading
  83. err := json.Unmarshal(message, &readings)
  84. if err != nil {
  85. log.Printf("Errore parsing JSON: %v", err)
  86. return
  87. }
  88. for _, reading := range readings {
  89. if reading.Type == "Gateway" {
  90. continue
  91. }
  92. incoming := model.Incoming_json{
  93. Hostname: hostname,
  94. MAC: reading.MAC,
  95. RSSI: int64(reading.RSSI),
  96. Data: reading.RawData,
  97. HB_ButtonCounter: parseButtonState(reading.RawData),
  98. }
  99. incomingChan <- incoming
  100. }
  101. } else {
  102. s := strings.Split(string(message), ",")
  103. if len(s) < 6 {
  104. log.Printf("Messaggio CSV non valido: %s", msgStr)
  105. return
  106. }
  107. rawdata := s[4]
  108. buttonCounter := parseButtonState(rawdata)
  109. if buttonCounter > 0 {
  110. incoming := model.Incoming_json{}
  111. i, _ := strconv.ParseInt(s[3], 10, 64)
  112. incoming.Hostname = hostname
  113. incoming.Beacon_type = "hb_button"
  114. incoming.MAC = s[1]
  115. incoming.RSSI = i
  116. incoming.Data = rawdata
  117. incoming.HB_ButtonCounter = buttonCounter
  118. read_line := strings.TrimRight(string(s[5]), "\r\n")
  119. it, err33 := strconv.Atoi(read_line)
  120. if err33 != nil {
  121. fmt.Println(it)
  122. fmt.Println(err33)
  123. os.Exit(2)
  124. }
  125. incomingChan <- incoming
  126. }
  127. }
  128. },
  129. },
  130. },
  131. })
  132. if err != nil {
  133. panic(err)
  134. }
  135. fmt.Println("CONNECTED TO MQTT")
  136. fmt.Println("\n ")
  137. fmt.Println("Visit http://" + cfg.HTTPAddr + " on your browser to see the web interface")
  138. fmt.Println("\n ")
  139. go httpserver.StartHTTPServer(cfg.HTTPAddr, ctx)
  140. <-sigc
  141. if err := cli.Disconnect(); err != nil {
  142. panic(err)
  143. }
  144. }
  145. func parseButtonState(raw string) int64 {
  146. raw = strings.ToUpper(raw)
  147. if strings.HasPrefix(raw, "0201060303E1FF12") && len(raw) >= 38 {
  148. buttonField := raw[34:38]
  149. if buttonValue, err := strconv.ParseInt(buttonField, 16, 64); err == nil {
  150. return buttonValue
  151. }
  152. }
  153. if strings.HasPrefix(raw, "02010612FF590") && len(raw) >= 24 {
  154. counterField := raw[22:24]
  155. buttonState, err := strconv.ParseInt(counterField, 16, 64)
  156. if err == nil {
  157. return buttonState
  158. }
  159. }
  160. return 0
  161. }