您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

205 行
5.1 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "time"
  7. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  8. "github.com/AFASystems/presence/internal/pkg/common/utils"
  9. "github.com/AFASystems/presence/internal/pkg/config"
  10. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  11. "github.com/AFASystems/presence/internal/pkg/model"
  12. "github.com/segmentio/kafka-go"
  13. )
  14. func main() {
  15. // Load global context to init beacons and latest list
  16. appState := appcontext.NewAppState()
  17. cfg := config.Load()
  18. // Kafka reader for Raw MQTT beacons
  19. rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc")
  20. defer rawReader.Close()
  21. // Kafka reader for API server updates
  22. apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc")
  23. defer apiReader.Close()
  24. writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "locevents")
  25. defer writer.Close()
  26. fmt.Println("Locations algorithm initialized, subscribed to Kafka topics")
  27. locTicker := time.NewTicker(1 * time.Second)
  28. defer locTicker.Stop()
  29. chRaw := make(chan model.BeaconAdvertisement, 2000)
  30. chApi := make(chan model.ApiUpdate, 2000)
  31. go kafkaclient.Consume(rawReader, chRaw)
  32. go kafkaclient.Consume(apiReader, chApi)
  33. for {
  34. select {
  35. case <-locTicker.C:
  36. getLikelyLocations(appState, writer)
  37. case msg := <-chRaw:
  38. assignBeaconToList(msg, appState)
  39. case msg := <-chApi:
  40. switch msg.Method {
  41. case "POST":
  42. id := msg.Beacon.ID
  43. appState.AddBeaconToLookup(id)
  44. case "DELETE":
  45. fmt.Println("Incoming delete message")
  46. }
  47. }
  48. }
  49. }
  50. func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
  51. fmt.Println("get likely locations called")
  52. beacons := appState.GetAllBeacons()
  53. settings := appState.GetSettingsValue()
  54. for _, beacon := range beacons {
  55. // Shrinking the model because other properties have nothing to do with the location
  56. r := model.HTTPLocation{
  57. Method: "Standard",
  58. Distance: 999,
  59. ID: beacon.ID,
  60. Location: "",
  61. LastSeen: 999,
  62. }
  63. mSize := len(beacon.BeaconMetrics)
  64. if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > settings.LastSeenThreshold {
  65. continue
  66. }
  67. locList := make(map[string]float64)
  68. seenW := 1.5
  69. rssiW := 0.75
  70. for _, metric := range beacon.BeaconMetrics {
  71. res := seenW + (rssiW * (1.0 - (float64(metric.RSSI) / -100.0)))
  72. locList[metric.Location] += res
  73. }
  74. bestLocName := ""
  75. maxScore := 0.0
  76. for locName, score := range locList {
  77. if score > maxScore {
  78. maxScore = score
  79. bestLocName = locName
  80. }
  81. }
  82. if bestLocName == beacon.PreviousLocation {
  83. beacon.LocationConfidence++
  84. } else {
  85. beacon.LocationConfidence = 0
  86. }
  87. r.Distance = beacon.BeaconMetrics[mSize-1].Distance
  88. r.Location = bestLocName
  89. r.LastSeen = beacon.BeaconMetrics[mSize-1].Timestamp
  90. if beacon.LocationConfidence == settings.LocationConfidence && beacon.PreviousConfidentLocation != bestLocName {
  91. beacon.LocationConfidence = 0
  92. // Who do I need this if I am sending entire structure anyways? who knows
  93. js, err := json.Marshal(model.LocationChange{
  94. Method: "LocationChange",
  95. BeaconRef: beacon,
  96. Name: beacon.Name,
  97. PreviousLocation: beacon.PreviousConfidentLocation,
  98. NewLocation: bestLocName,
  99. Timestamp: time.Now().Unix(),
  100. })
  101. if err != nil {
  102. beacon.PreviousConfidentLocation = bestLocName
  103. beacon.PreviousLocation = bestLocName
  104. appState.UpdateBeacon(beacon.ID, beacon)
  105. continue
  106. }
  107. msg := kafka.Message{
  108. Value: js,
  109. }
  110. err = writer.WriteMessages(context.Background(), msg)
  111. if err != nil {
  112. fmt.Println("Error in sending Kafka message")
  113. }
  114. }
  115. beacon.PreviousLocation = bestLocName
  116. appState.UpdateBeacon(beacon.ID, beacon)
  117. js, err := json.Marshal(r)
  118. if err != nil {
  119. continue
  120. }
  121. msg := kafka.Message{
  122. Value: js,
  123. }
  124. err = writer.WriteMessages(context.Background(), msg)
  125. if err != nil {
  126. fmt.Println("Error in sending Kafka message")
  127. }
  128. }
  129. }
  130. func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppState) {
  131. id := adv.MAC
  132. ok := appState.BeaconExists(id)
  133. now := time.Now().Unix()
  134. if !ok {
  135. appState.UpdateLatestBeacon(id, model.Beacon{ID: id, BeaconType: adv.BeaconType, LastSeen: now, IncomingJSON: adv, BeaconLocation: adv.Hostname, Distance: utils.CalculateDistance(adv)})
  136. return
  137. }
  138. settings := appState.GetSettingsValue()
  139. if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) {
  140. return
  141. }
  142. beacon, ok := appState.GetBeacon(id)
  143. if !ok {
  144. beacon = model.Beacon{
  145. ID: id,
  146. }
  147. }
  148. beacon.IncomingJSON = adv
  149. beacon.LastSeen = now
  150. if beacon.BeaconMetrics == nil {
  151. beacon.BeaconMetrics = make([]model.BeaconMetric, 0, settings.BeaconMetricSize)
  152. }
  153. metric := model.BeaconMetric{
  154. Distance: utils.CalculateDistance(adv),
  155. Timestamp: now,
  156. RSSI: int64(adv.RSSI),
  157. Location: adv.Hostname,
  158. }
  159. if len(beacon.BeaconMetrics) >= settings.BeaconMetricSize {
  160. copy(beacon.BeaconMetrics, beacon.BeaconMetrics[1:])
  161. beacon.BeaconMetrics[settings.BeaconMetricSize-1] = metric
  162. } else {
  163. beacon.BeaconMetrics = append(beacon.BeaconMetrics, metric)
  164. }
  165. appState.UpdateBeacon(id, beacon)
  166. }