您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

223 行
5.5 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "math"
  7. "strconv"
  8. "time"
  9. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  10. "github.com/AFASystems/presence/internal/pkg/config"
  11. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  12. "github.com/AFASystems/presence/internal/pkg/model"
  13. "github.com/segmentio/kafka-go"
  14. )
  15. func main() {
  16. // Load global context to init beacons and latest list
  17. appState := appcontext.NewAppState()
  18. cfg := config.Load()
  19. // Kafka reader for Raw MQTT beacons
  20. rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc")
  21. defer rawReader.Close()
  22. // Kafka reader for API server updates
  23. apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc")
  24. defer apiReader.Close()
  25. writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "locevents")
  26. defer writer.Close()
  27. fmt.Println("Locations algorithm initialized, subscribed to Kafka topics")
  28. locTicker := time.NewTicker(1 * time.Second)
  29. defer locTicker.Stop()
  30. chRaw := make(chan model.BeaconAdvertisement, 2000)
  31. chApi := make(chan model.ApiUpdate, 2000)
  32. go kafkaclient.Consume(rawReader, chRaw)
  33. go kafkaclient.Consume(apiReader, chApi)
  34. for {
  35. select {
  36. case <-locTicker.C:
  37. getLikelyLocations(appState, writer)
  38. case msg := <-chRaw:
  39. assignBeaconToList(msg, appState)
  40. case msg := <-chApi:
  41. switch msg.Method {
  42. case "POST":
  43. id := msg.Beacon.ID
  44. appState.AddBeaconToLookup(id)
  45. case "DELETE":
  46. fmt.Println("Incoming delete message")
  47. }
  48. }
  49. }
  50. }
  51. func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {
  52. fmt.Println("get likely locations called")
  53. beacons := appState.GetAllBeacons()
  54. settings := appState.GetSettingsValue()
  55. for _, beacon := range beacons {
  56. // Shrinking the model because other properties have nothing to do with the location
  57. r := model.HTTPLocation{
  58. Method: "Standard",
  59. Distance: 999,
  60. ID: beacon.ID,
  61. Location: "",
  62. LastSeen: 999,
  63. }
  64. mSize := len(beacon.BeaconMetrics)
  65. if (int64(time.Now().Unix()) - (beacon.BeaconMetrics[mSize-1].Timestamp)) > settings.LastSeenThreshold {
  66. continue
  67. }
  68. locList := make(map[string]float64)
  69. seenW := 1.5
  70. rssiW := 0.75
  71. for _, metric := range beacon.BeaconMetrics {
  72. res := seenW + (rssiW * (1.0 - (float64(metric.RSSI) / -100.0)))
  73. locList[metric.Location] += res
  74. }
  75. bestLocName := ""
  76. maxScore := 0.0
  77. for locName, score := range locList {
  78. if score > maxScore {
  79. maxScore = score
  80. bestLocName = locName
  81. }
  82. }
  83. if bestLocName == beacon.PreviousLocation {
  84. beacon.LocationConfidence++
  85. } else {
  86. beacon.LocationConfidence = 0
  87. }
  88. r.Distance = beacon.BeaconMetrics[mSize-1].Distance
  89. r.Location = bestLocName
  90. r.LastSeen = beacon.BeaconMetrics[mSize-1].Timestamp
  91. if beacon.LocationConfidence == settings.LocationConfidence && beacon.PreviousConfidentLocation != bestLocName {
  92. beacon.LocationConfidence = 0
  93. // Who do I need this if I am sending entire structure anyways? who knows
  94. js, err := json.Marshal(model.LocationChange{
  95. Method: "LocationChange",
  96. BeaconRef: beacon,
  97. Name: beacon.Name,
  98. PreviousLocation: beacon.PreviousConfidentLocation,
  99. NewLocation: bestLocName,
  100. Timestamp: time.Now().Unix(),
  101. })
  102. if err != nil {
  103. beacon.PreviousConfidentLocation = bestLocName
  104. beacon.PreviousLocation = bestLocName
  105. appState.UpdateBeacon(beacon.ID, beacon)
  106. continue
  107. }
  108. msg := kafka.Message{
  109. Value: js,
  110. }
  111. err = writer.WriteMessages(context.Background(), msg)
  112. if err != nil {
  113. fmt.Println("Error in sending Kafka message")
  114. }
  115. }
  116. beacon.PreviousLocation = bestLocName
  117. appState.UpdateBeacon(beacon.ID, beacon)
  118. js, err := json.Marshal(r)
  119. if err != nil {
  120. continue
  121. }
  122. msg := kafka.Message{
  123. Value: js,
  124. }
  125. err = writer.WriteMessages(context.Background(), msg)
  126. if err != nil {
  127. fmt.Println("Error in sending Kafka message")
  128. }
  129. }
  130. }
  131. func assignBeaconToList(adv model.BeaconAdvertisement, appState *appcontext.AppState) {
  132. id := adv.MAC
  133. ok := appState.BeaconExists(id)
  134. now := time.Now().Unix()
  135. if !ok {
  136. appState.UpdateLatestBeacon(id, model.Beacon{ID: id, BeaconType: adv.BeaconType, LastSeen: now, IncomingJSON: adv, BeaconLocation: adv.Hostname, Distance: getBeaconDistance(adv)})
  137. return
  138. }
  139. settings := appState.GetSettingsValue()
  140. fmt.Println("RSSI: ", adv.RSSI)
  141. if settings.RSSIEnforceThreshold && (int64(adv.RSSI) < settings.RSSIMinThreshold) {
  142. return
  143. }
  144. beacon, ok := appState.GetBeacon(id)
  145. if !ok {
  146. beacon = model.Beacon{
  147. ID: id,
  148. }
  149. }
  150. beacon.IncomingJSON = adv
  151. beacon.LastSeen = now
  152. if beacon.BeaconMetrics == nil {
  153. beacon.BeaconMetrics = make([]model.BeaconMetric, 0, settings.BeaconMetricSize)
  154. }
  155. metric := model.BeaconMetric{
  156. Distance: getBeaconDistance(adv),
  157. Timestamp: now,
  158. RSSI: int64(adv.RSSI),
  159. Location: adv.Hostname,
  160. }
  161. if len(beacon.BeaconMetrics) >= settings.BeaconMetricSize {
  162. copy(beacon.BeaconMetrics, beacon.BeaconMetrics[1:])
  163. beacon.BeaconMetrics[settings.BeaconMetricSize-1] = metric
  164. } else {
  165. beacon.BeaconMetrics = append(beacon.BeaconMetrics, metric)
  166. }
  167. appState.UpdateBeacon(id, beacon)
  168. }
  169. func getBeaconDistance(adv model.BeaconAdvertisement) float64 {
  170. ratio := float64(adv.RSSI) * (1.0 / float64(twosComp(adv.TXPower)))
  171. distance := 100.0
  172. if ratio < 1.0 {
  173. distance = math.Pow(ratio, 10)
  174. } else {
  175. distance = (0.89976)*math.Pow(ratio, 7.7095) + 0.111
  176. }
  177. return distance
  178. }
  179. func twosComp(inp string) int64 {
  180. i, _ := strconv.ParseInt("0x"+inp, 0, 64)
  181. return i - 256
  182. }