No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.
 
 
 
 

230 líneas
6.3 KiB

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "math"
  6. "strconv"
  7. "time"
  8. "github.com/AFASystems/presence/internal/pkg/config"
  9. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  10. "github.com/AFASystems/presence/internal/pkg/model"
  11. "github.com/AFASystems/presence/internal/pkg/mqttclient"
  12. presenseredis "github.com/AFASystems/presence/internal/pkg/redis"
  13. "github.com/redis/go-redis/v9"
  14. )
  15. // Move Kafka topics, Redis keys, intervals to env config
  16. // Replace hardcoded IPs with env vars
  17. // avoid defers -> lock and unlock right before and after usage
  18. // Distance formula uses twos_comp incorrectly should parse signed int not hex string
  19. // Use buffered log instead of fmt.Println ???
  20. // Limit metrics slice size with ring buffer ??
  21. // handle go routine exit signals with context.WithCancel() ??
  22. // Make internal package for Kafka and Redis
  23. // Make internal package for processor:
  24. // Helper functions: twos_comp, getBeaconId
  25. func main() {
  26. // Load global context to init beacons and latest list
  27. appCtx := model.AppContext{
  28. Beacons: model.BeaconsList{
  29. Beacons: make(map[string]model.Beacon),
  30. },
  31. LatestList: model.LatestBeaconsList{
  32. LatestList: make(map[string]model.Beacon),
  33. },
  34. Settings: model.Settings{
  35. Settings: model.SettingsVal{
  36. Location_confidence: 4,
  37. Last_seen_threshold: 15,
  38. Beacon_metrics_size: 30,
  39. HA_send_interval: 5,
  40. HA_send_changes_only: false,
  41. },
  42. },
  43. }
  44. cfg := config.Load()
  45. // Kafka writer idk why yet
  46. writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "beacons")
  47. defer writer.Close()
  48. // Kafka reader for Raw MQTT beacons
  49. rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "someID")
  50. defer rawReader.Close()
  51. // Kafka reader for API server updates
  52. apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "someID")
  53. defer apiReader.Close()
  54. // Kafka reader for latest list updates
  55. latestReader := kafkaclient.KafkaReader(cfg.KafkaURL, "latestbeacons", "someID")
  56. defer latestReader.Close()
  57. // Kafka reader for settings updates
  58. settingsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "settings", "someID")
  59. defer settingsReader.Close()
  60. ctx := context.Background()
  61. // Init Redis Client
  62. client := redis.NewClient(&redis.Options{
  63. Addr: cfg.RedisURL,
  64. Password: "",
  65. })
  66. beaconsList := presenseredis.LoadBeaconsList(client, ctx)
  67. appCtx.Beacons.Beacons = beaconsList
  68. latestList := presenseredis.LoadLatestList(client, ctx)
  69. appCtx.LatestList.LatestList = latestList
  70. settings := presenseredis.LoadSettings(client, ctx)
  71. appCtx.Settings.Settings = settings
  72. // declare channel for collecting Kafka messages
  73. chRaw := make(chan model.Incoming_json, 2000)
  74. chApi := make(chan model.ApiUpdate, 2000)
  75. chLatest := make(chan model.Incoming_json, 2000)
  76. chSettings := make(chan model.SettingsVal, 10)
  77. go kafkaclient.Consume(rawReader, chRaw)
  78. go kafkaclient.Consume(apiReader, chApi)
  79. go kafkaclient.Consume(latestReader, chLatest)
  80. go kafkaclient.Consume(settingsReader, chSettings)
  81. go func() {
  82. // Syncing Redis cache every 1s with 2 lists: beacons, latest list
  83. ticker := time.NewTicker(1 * time.Second)
  84. defer ticker.Stop()
  85. for range ticker.C {
  86. presenseredis.SaveBeaconsList(&appCtx, client, ctx)
  87. presenseredis.SaveLatestList(&appCtx, client, ctx)
  88. presenseredis.SaveSettings(&appCtx, client, ctx)
  89. }
  90. }()
  91. for {
  92. select {
  93. case msg := <-chRaw:
  94. processIncoming(msg, &appCtx)
  95. case msg := <-chApi:
  96. switch msg.Method {
  97. case "POST":
  98. appCtx.Beacons.Lock.Lock()
  99. appCtx.Beacons.Beacons[msg.Beacon.Beacon_id] = msg.Beacon
  100. case "DELETE":
  101. _, exists := appCtx.Beacons.Beacons[msg.ID]
  102. if exists {
  103. appCtx.Beacons.Lock.Lock()
  104. delete(appCtx.Beacons.Beacons, msg.ID)
  105. }
  106. default:
  107. fmt.Println("unknown method: ", msg.Method)
  108. }
  109. appCtx.Beacons.Lock.Unlock()
  110. case msg := <-chLatest:
  111. fmt.Println("latest msg: ", msg)
  112. case msg := <-chSettings:
  113. appCtx.Settings.Lock.Lock()
  114. appCtx.Settings.Settings = msg
  115. fmt.Println("settings channel: ", msg)
  116. appCtx.Settings.Lock.Unlock()
  117. }
  118. }
  119. }
  120. func processIncoming(incoming model.Incoming_json, ctx *model.AppContext) {
  121. defer func() {
  122. if err := recover(); err != nil {
  123. fmt.Println("work failed:", err)
  124. }
  125. }()
  126. incoming = mqttclient.IncomingBeaconFilter(incoming)
  127. id := mqttclient.GetBeaconID(incoming)
  128. now := time.Now().Unix()
  129. beacons := &ctx.Beacons
  130. beacons.Lock.Lock()
  131. defer beacons.Lock.Unlock()
  132. latestList := &ctx.LatestList
  133. latestList.Lock.Lock()
  134. defer latestList.Lock.Unlock()
  135. beacon, exists := beacons.Beacons[id]
  136. if !exists {
  137. x, exists := latestList.LatestList[id]
  138. if exists {
  139. x.Last_seen = now
  140. x.Incoming_JSON = incoming
  141. x.Distance = getBeaconDistance(incoming)
  142. latestList.LatestList[id] = x
  143. } else {
  144. latestList.LatestList[id] = model.Beacon{Beacon_id: id, Beacon_type: incoming.Beacon_type, Last_seen: now, Incoming_JSON: incoming, Beacon_location: incoming.Hostname, Distance: getBeaconDistance(incoming)}
  145. }
  146. // Move this to seperate routine?
  147. for k, v := range latestList.LatestList {
  148. if (now - v.Last_seen) > 10 {
  149. delete(latestList.LatestList, k)
  150. }
  151. }
  152. return
  153. }
  154. updateBeacon(&beacon, incoming)
  155. beacons.Beacons[id] = beacon
  156. }
  157. func getBeaconDistance(incoming model.Incoming_json) float64 {
  158. rssi := incoming.RSSI
  159. power := incoming.TX_power
  160. distance := 100.0
  161. ratio := float64(rssi) * (1.0 / float64(twos_comp(power)))
  162. if ratio < 1.0 {
  163. distance = math.Pow(ratio, 10)
  164. } else {
  165. distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111
  166. }
  167. return distance
  168. }
  169. func updateBeacon(beacon *model.Beacon, incoming model.Incoming_json) {
  170. now := time.Now().Unix()
  171. beacon.Incoming_JSON = incoming
  172. beacon.Last_seen = now
  173. beacon.Beacon_type = incoming.Beacon_type
  174. beacon.HB_ButtonCounter = incoming.HB_ButtonCounter
  175. beacon.HB_Battery = incoming.HB_Battery
  176. beacon.HB_RandomNonce = incoming.HB_RandomNonce
  177. beacon.HB_ButtonMode = incoming.HB_ButtonMode
  178. if beacon.Beacon_metrics == nil {
  179. beacon.Beacon_metrics = make([]model.BeaconMetric, 10) // 10 is a placeholder for now
  180. }
  181. metric := model.BeaconMetric{}
  182. metric.Distance = getBeaconDistance(incoming)
  183. metric.Timestamp = now
  184. metric.Rssi = int64(incoming.RSSI)
  185. metric.Location = incoming.Hostname
  186. beacon.Beacon_metrics = append(beacon.Beacon_metrics, metric)
  187. // Leave the HB button implementation for now
  188. }
  189. func twos_comp(inp string) int64 {
  190. i, _ := strconv.ParseInt("0x"+inp, 0, 64)
  191. return i - 256
  192. }