25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 
 
 

107 satır
2.3 KiB

  1. package mqtthandler
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "os"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/AFASystems/presence/internal/pkg/model"
  12. "github.com/segmentio/kafka-go"
  13. )
  14. func MqttHandler(writer *kafka.Writer, topicName []byte, message []byte) {
  15. hostname := strings.Split(string(topicName), "/")[1]
  16. msgStr := string(message)
  17. if strings.HasPrefix(msgStr, "[") {
  18. var readings []model.RawReading
  19. err := json.Unmarshal(message, &readings)
  20. if err != nil {
  21. log.Printf("Error parsing JSON: %v", err)
  22. return
  23. }
  24. for _, reading := range readings {
  25. if reading.Type == "Gateway" {
  26. continue
  27. }
  28. adv := model.BeaconAdvertisement{
  29. Hostname: hostname,
  30. MAC: reading.MAC,
  31. RSSI: int64(reading.RSSI),
  32. Data: reading.RawData,
  33. HSButtonCounter: parseButtonState(reading.RawData),
  34. }
  35. encodedMsg, err := json.Marshal(adv)
  36. if err != nil {
  37. fmt.Println("Error in marshaling: ", err)
  38. break
  39. }
  40. msg := kafka.Message{
  41. Value: encodedMsg,
  42. }
  43. err = writer.WriteMessages(context.Background(), msg)
  44. if err != nil {
  45. fmt.Println("Error in writing to Kafka: ", err)
  46. time.Sleep(1 * time.Second)
  47. break
  48. }
  49. }
  50. } else {
  51. s := strings.Split(string(message), ",")
  52. if len(s) < 6 {
  53. log.Printf("Messaggio CSV non valido: %s", msgStr)
  54. return
  55. }
  56. rawdata := s[4]
  57. buttonCounter := parseButtonState(rawdata)
  58. if buttonCounter > 0 {
  59. adv := model.BeaconAdvertisement{}
  60. i, _ := strconv.ParseInt(s[3], 10, 64)
  61. adv.Hostname = hostname
  62. adv.BeaconType = "hb_button"
  63. adv.MAC = s[1]
  64. adv.RSSI = i
  65. adv.Data = rawdata
  66. adv.HSButtonCounter = buttonCounter
  67. read_line := strings.TrimRight(string(s[5]), "\r\n")
  68. it, err33 := strconv.Atoi(read_line)
  69. if err33 != nil {
  70. fmt.Println(it)
  71. fmt.Println(err33)
  72. os.Exit(2)
  73. }
  74. }
  75. }
  76. }
  77. func parseButtonState(raw string) int64 {
  78. raw = strings.ToUpper(raw)
  79. if strings.HasPrefix(raw, "0201060303E1FF12") && len(raw) >= 38 {
  80. buttonField := raw[34:38]
  81. if buttonValue, err := strconv.ParseInt(buttonField, 16, 64); err == nil {
  82. return buttonValue
  83. }
  84. }
  85. if strings.HasPrefix(raw, "02010612FF590") && len(raw) >= 24 {
  86. counterField := raw[22:24]
  87. buttonState, err := strconv.ParseInt(counterField, 16, 64)
  88. if err == nil {
  89. return buttonState
  90. }
  91. }
  92. return 0
  93. }