Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.
 
 
 
 

252 wiersze
6.2 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math"
  7. "strconv"
  8. "time"
  9. "github.com/AFASystems/presence/internal/pkg/config"
  10. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  11. "github.com/AFASystems/presence/internal/pkg/model"
  12. "github.com/segmentio/kafka-go"
  13. )
  14. func main() {
  15. // Load global context to init beacons and latest list
  16. appCtx := model.AppContext{
  17. Settings: model.Settings{
  18. Settings: model.SettingsVal{
  19. LocationConfidence: 4,
  20. LastSeenThreshold: 15,
  21. BeaconMetricSize: 30,
  22. HASendInterval: 5,
  23. HASendChangesOnly: false,
  24. RSSIEnforceThreshold: false,
  25. RSSIMinThreshold: 100,
  26. },
  27. },
  28. BeaconsLookup: make(map[string]struct{}),
  29. LatestList: model.LatestBeaconsList{
  30. LatestList: make(map[string]model.Beacon),
  31. },
  32. Beacons: model.BeaconsList{
  33. Beacons: make(map[string]model.Beacon),
  34. },
  35. }
  36. cfg := config.Load()
  37. // Kafka reader for Raw MQTT beacons
  38. rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc")
  39. defer rawReader.Close()
  40. // Kafka reader for API server updates
  41. apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc")
  42. defer apiReader.Close()
  43. writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "locevents")
  44. defer writer.Close()
  45. fmt.Println("Locations algorithm initialized, subscribed to Kafka topics")
  46. locTicker := time.NewTicker(1 * time.Second)
  47. defer locTicker.Stop()
  48. chRaw := make(chan model.BeaconAdvertisement, 2000)
  49. chApi := make(chan model.ApiUpdate, 2000)
  50. go kafkaclient.Consume(rawReader, chRaw)
  51. go kafkaclient.Consume(apiReader, chApi)
  52. for {
  53. select {
  54. case <-locTicker.C:
  55. getLikelyLocations(&appCtx, writer)
  56. case msg := <-chRaw:
  57. assignBeaconToList(msg, &appCtx)
  58. case msg := <-chApi:
  59. switch msg.Method {
  60. case "POST":
  61. id := msg.Beacon.ID
  62. appCtx.BeaconsLookup[id] = struct{}{}
  63. case "DELETE":
  64. fmt.Println("Incoming delete message")
  65. }
  66. }
  67. }
  68. }
  69. func getLikelyLocations(ctx *model.AppContext, writer *kafka.Writer) {
  70. fmt.Println("get likely locations called")
  71. ctx.Beacons.Lock.Lock()
  72. beacons := ctx.Beacons.Beacons
  73. ctx.Beacons.Lock.Unlock()
  74. for _, beacon := range beacons {
  75. // Shrinking the model because other properties have nothing to do with the location
  76. r := model.HTTPLocation{
  77. Method: "Standard",
  78. Distance: 999,
  79. ID: beacon.ID,
  80. Location: "",
  81. LastSeen: 999,
  82. }
  83. mSize := len(beacon.BeaconMetrics)
  84. if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > ctx.Settings.Settings.LastSeenThreshold {
  85. continue
  86. }
  87. locList := make(map[string]float64)
  88. seenW := 1.5
  89. rssiW := 0.75
  90. for _, metric := range beacon.BeaconMetrics {
  91. res := seenW + (rssiW * (1.0 - (float64(metric.RSSI) / -100.0)))
  92. locList[metric.Location] += res
  93. }
  94. bestLocName := ""
  95. maxScore := 0.0
  96. for locName, score := range locList {
  97. if score > maxScore {
  98. maxScore = score
  99. bestLocName = locName
  100. }
  101. }
  102. if bestLocName == beacon.PreviousLocation {
  103. beacon.LocationConfidence++
  104. } else {
  105. beacon.LocationConfidence = 0
  106. }
  107. r.Distance = beacon.BeaconMetrics[mSize-1].Distance
  108. r.Location = bestLocName
  109. r.LastSeen = beacon.BeaconMetrics[mSize-1].Timestamp
  110. if beacon.LocationConfidence == ctx.Settings.Settings.LocationConfidence && beacon.PreviousConfidentLocation != bestLocName {
  111. beacon.LocationConfidence = 0
  112. // Who do I need this if I am sending entire structure anyways? who knows
  113. js, err := json.Marshal(model.LocationChange{
  114. Method: "LocationChange",
  115. BeaconRef: beacon,
  116. Name: beacon.Name,
  117. PreviousLocation: beacon.PreviousConfidentLocation,
  118. NewLocation: bestLocName,
  119. Timestamp: time.Now().Unix(),
  120. })
  121. if err != nil {
  122. beacon.PreviousConfidentLocation = bestLocName
  123. beacon.PreviousLocation = bestLocName
  124. ctx.Beacons.Lock.Lock()
  125. ctx.Beacons.Beacons[beacon.ID] = beacon
  126. ctx.Beacons.Lock.Unlock()
  127. continue
  128. }
  129. msg := kafka.Message{
  130. Value: js,
  131. }
  132. err = writer.WriteMessages(context.Background(), msg)
  133. if err != nil {
  134. fmt.Println("Error in sending Kafka message")
  135. }
  136. }
  137. beacon.PreviousLocation = bestLocName
  138. ctx.Beacons.Lock.Lock()
  139. ctx.Beacons.Beacons[beacon.ID] = beacon
  140. ctx.Beacons.Lock.Unlock()
  141. js, err := json.Marshal(r)
  142. if err != nil {
  143. continue
  144. }
  145. msg := kafka.Message{
  146. Value: js,
  147. }
  148. err = writer.WriteMessages(context.Background(), msg)
  149. if err != nil {
  150. fmt.Println("Error in sending Kafka message")
  151. }
  152. }
  153. }
  154. func assignBeaconToList(adv model.BeaconAdvertisement, ctx *model.AppContext) {
  155. id := adv.MAC
  156. _, ok := ctx.BeaconsLookup[id]
  157. now := time.Now().Unix()
  158. if !ok {
  159. ctx.LatestList.Lock.Lock()
  160. ctx.LatestList.LatestList[id] = model.Beacon{ID: id, BeaconType: adv.BeaconType, LastSeen: now, IncomingJSON: adv, BeaconLocation: adv.Hostname, Distance: getBeaconDistance(adv)}
  161. ctx.LatestList.Lock.Unlock()
  162. return
  163. }
  164. fmt.Println("RSSI: ", adv.RSSI)
  165. if ctx.Settings.Settings.RSSIEnforceThreshold && (int64(adv.RSSI) < ctx.Settings.Settings.RSSIMinThreshold) {
  166. return
  167. }
  168. ctx.Beacons.Lock.Lock()
  169. beacon, ok := ctx.Beacons.Beacons[id]
  170. if !ok {
  171. beacon = model.Beacon{
  172. ID: id,
  173. }
  174. }
  175. ctx.Beacons.Lock.Unlock()
  176. beacon.IncomingJSON = adv
  177. beacon.LastSeen = now
  178. if beacon.BeaconMetrics == nil {
  179. beacon.BeaconMetrics = make([]model.BeaconMetric, 0, ctx.Settings.Settings.BeaconMetricSize)
  180. }
  181. metric := model.BeaconMetric{
  182. Distance: getBeaconDistance(adv),
  183. Timestamp: now,
  184. RSSI: int64(adv.RSSI),
  185. Location: adv.Hostname,
  186. }
  187. if len(beacon.BeaconMetrics) >= ctx.Settings.Settings.BeaconMetricSize {
  188. copy(beacon.BeaconMetrics, beacon.BeaconMetrics[1:])
  189. beacon.BeaconMetrics[ctx.Settings.Settings.BeaconMetricSize-1] = metric
  190. } else {
  191. beacon.BeaconMetrics = append(beacon.BeaconMetrics, metric)
  192. }
  193. ctx.Beacons.Lock.Lock()
  194. ctx.Beacons.Beacons[id] = beacon
  195. ctx.Beacons.Lock.Unlock()
  196. }
  197. func getBeaconDistance(adv model.BeaconAdvertisement) float64 {
  198. ratio := float64(adv.RSSI) * (1.0 / float64(twosComp(adv.TXPower)))
  199. distance := 100.0
  200. if ratio < 1.0 {
  201. distance = math.Pow(ratio, 10)
  202. } else {
  203. distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111
  204. }
  205. return distance
  206. }
  207. func twosComp(inp string) int64 {
  208. i, _ := strconv.ParseInt("0x"+inp, 0, 64)
  209. return i - 256
  210. }