No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.
 
 
 
 

224 líneas
4.5 KiB

  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "encoding/json"
  8. "fmt"
  9. "strings"
  10. "github.com/AFASystems/presence/internal/pkg/config"
  11. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  12. "github.com/AFASystems/presence/internal/pkg/model"
  13. "github.com/segmentio/kafka-go"
  14. )
  15. func main() {
  16. // Load global context to init beacons and latest list
  17. appCtx := model.AppContext{
  18. Beacons: model.BeaconsList{
  19. Beacons: make(map[string]model.Beacon),
  20. },
  21. Settings: model.Settings{
  22. Settings: model.SettingsVal{
  23. LocationConfidence: 4,
  24. LastSeenThreshold: 15,
  25. BeaconMetricSize: 30,
  26. HASendInterval: 5,
  27. HASendChangesOnly: false,
  28. },
  29. },
  30. BeaconEvents: model.BeaconEventList{
  31. Beacons: make(map[string]model.BeaconEvent),
  32. },
  33. BeaconsLookup: make(map[string]struct{}),
  34. }
  35. cfg := config.Load()
  36. // Kafka reader for Raw MQTT beacons
  37. rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
  38. defer rawReader.Close()
  39. // Kafka reader for API server updates
  40. apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api")
  41. defer apiReader.Close()
  42. alertWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "alertbeacons")
  43. defer alertWriter.Close()
  44. fmt.Println("Decoder initialized, subscribed to Kafka topics")
  45. chRaw := make(chan model.BeaconAdvertisement, 2000)
  46. chApi := make(chan model.ApiUpdate, 2000)
  47. go kafkaclient.Consume(rawReader, chRaw)
  48. go kafkaclient.Consume(apiReader, chApi)
  49. for {
  50. select {
  51. case msg := <-chRaw:
  52. processIncoming(msg, &appCtx, alertWriter)
  53. case msg := <-chApi:
  54. switch msg.Method {
  55. case "POST":
  56. id := msg.Beacon.ID
  57. appCtx.BeaconsLookup[id] = struct{}{}
  58. case "DELETE":
  59. fmt.Println("Incoming delete message")
  60. }
  61. }
  62. }
  63. }
  64. func processIncoming(adv model.BeaconAdvertisement, ctx *model.AppContext, writer *kafka.Writer) {
  65. id := adv.MAC
  66. _, ok := ctx.BeaconsLookup[id]
  67. if !ok {
  68. return
  69. }
  70. err := decodeBeacon(adv, ctx, writer)
  71. if err != nil {
  72. fmt.Println("error in decoding")
  73. return
  74. }
  75. }
  76. func decodeBeacon(adv model.BeaconAdvertisement, ctx *model.AppContext, writer *kafka.Writer) error {
  77. beacon := strings.TrimSpace(adv.Data)
  78. id := adv.MAC
  79. if beacon == "" {
  80. return nil // How to return error?, do I even need to return error
  81. }
  82. b, err := hex.DecodeString(beacon)
  83. if err != nil {
  84. return err
  85. }
  86. // check for flag byte, if first AD structure is flag bytes, remove it
  87. if len(b) > 1 && b[1] == 0x01 {
  88. l := int(b[0]) // length of AD structure
  89. if 1+l <= len(b) {
  90. b = b[1+l:]
  91. }
  92. }
  93. adStructureIndeces := ParseADFast(b)
  94. event := model.BeaconEvent{}
  95. for _, r := range adStructureIndeces {
  96. ad := b[r[0]:r[1]]
  97. if checkIngics(ad) {
  98. event = parseIngicsState(ad)
  99. event.ID = id
  100. event.Name = id
  101. break
  102. } else if checkEddystoneTLM(ad) {
  103. event = parseEddystoneState(ad)
  104. event.ID = id
  105. event.Name = id
  106. break
  107. } else if checkMinewB7(ad) {
  108. fmt.Println("Minew B7 vendor format")
  109. break
  110. }
  111. }
  112. if event.ID != "" {
  113. prevEvent, ok := ctx.BeaconEvents.Beacons[id]
  114. ctx.BeaconEvents.Beacons[id] = event
  115. if ok && bytes.Equal(prevEvent.Hash(), event.Hash()) {
  116. return nil
  117. }
  118. eMsg, err := json.Marshal(event)
  119. if err != nil {
  120. return err
  121. }
  122. err = writer.WriteMessages(context.Background(), kafka.Message{
  123. Value: eMsg,
  124. })
  125. if err != nil {
  126. return err
  127. }
  128. fmt.Println("Message sent")
  129. }
  130. return nil
  131. }
  132. func checkIngics(ad []byte) bool {
  133. if len(ad) >= 6 &&
  134. ad[1] == 0xFF &&
  135. ad[2] == 0x59 &&
  136. ad[3] == 0x00 &&
  137. ad[4] == 0x80 &&
  138. ad[5] == 0xBC {
  139. return true
  140. }
  141. return false
  142. }
  143. func parseIngicsState(ad []byte) model.BeaconEvent {
  144. return model.BeaconEvent{
  145. Battery: uint32(binary.LittleEndian.Uint16(ad[6:8])),
  146. Event: int(ad[8]),
  147. Type: "Ingics",
  148. }
  149. }
  150. func checkEddystoneTLM(ad []byte) bool {
  151. if len(ad) >= 4 &&
  152. ad[1] == 0x16 &&
  153. ad[2] == 0xAA &&
  154. ad[3] == 0xFE &&
  155. ad[4] == 0x20 {
  156. return true
  157. }
  158. return false
  159. }
  160. func parseEddystoneState(ad []byte) model.BeaconEvent {
  161. return model.BeaconEvent{
  162. Battery: uint32(binary.BigEndian.Uint16(ad[6:8])),
  163. Type: "Eddystone",
  164. }
  165. }
  166. // I dont think this is always true, but for testing is ok
  167. func checkMinewB7(ad []byte) bool {
  168. if len(ad) >= 4 &&
  169. ad[1] == 0x16 &&
  170. ad[2] == 0xE1 &&
  171. ad[3] == 0xFF {
  172. return true
  173. }
  174. return false
  175. }
  176. func ParseADFast(b []byte) [][2]int {
  177. var res [][2]int
  178. i := 0
  179. for i < len(b) {
  180. l := int(b[i])
  181. if l == 0 || i+1+l > len(b) {
  182. break
  183. }
  184. res = append(res, [2]int{i, i + 1 + l})
  185. i += 1 + l
  186. }
  187. return res
  188. }