You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

108 line
2.4 KiB

  1. package mqtthandler
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "os"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/AFASystems/presence/internal/pkg/model"
  12. "github.com/segmentio/kafka-go"
  13. )
  14. func MqttHandler(writer *kafka.Writer, topicName []byte, message []byte) {
  15. hostname := strings.Split(string(topicName), "/")[1]
  16. msgStr := string(message)
  17. if strings.HasPrefix(msgStr, "[") {
  18. var readings []model.RawReading
  19. err := json.Unmarshal(message, &readings)
  20. if err != nil {
  21. log.Printf("Error parsing JSON: %v", err)
  22. return
  23. }
  24. for _, reading := range readings {
  25. if reading.Type == "Gateway" {
  26. continue
  27. }
  28. incoming := model.Incoming_json{
  29. Hostname: hostname,
  30. MAC: reading.MAC,
  31. RSSI: int64(reading.RSSI),
  32. Data: reading.RawData,
  33. HB_ButtonCounter: parseButtonState(reading.RawData),
  34. }
  35. encodedMsg, err := json.Marshal(incoming)
  36. if err != nil {
  37. fmt.Println("Error in marshaling: ", err)
  38. }
  39. msg := kafka.Message{
  40. Value: encodedMsg,
  41. }
  42. err = writer.WriteMessages(context.Background(), msg)
  43. if err != nil {
  44. fmt.Println("Error in writing to Kafka: ", err)
  45. time.Sleep(1 * time.Second)
  46. break
  47. }
  48. fmt.Println("message sent: ", time.Now())
  49. }
  50. } else {
  51. s := strings.Split(string(message), ",")
  52. if len(s) < 6 {
  53. log.Printf("Messaggio CSV non valido: %s", msgStr)
  54. return
  55. }
  56. rawdata := s[4]
  57. buttonCounter := parseButtonState(rawdata)
  58. if buttonCounter > 0 {
  59. incoming := model.Incoming_json{}
  60. i, _ := strconv.ParseInt(s[3], 10, 64)
  61. incoming.Hostname = hostname
  62. incoming.Beacon_type = "hb_button"
  63. incoming.MAC = s[1]
  64. incoming.RSSI = i
  65. incoming.Data = rawdata
  66. incoming.HB_ButtonCounter = buttonCounter
  67. read_line := strings.TrimRight(string(s[5]), "\r\n")
  68. it, err33 := strconv.Atoi(read_line)
  69. if err33 != nil {
  70. fmt.Println(it)
  71. fmt.Println(err33)
  72. os.Exit(2)
  73. }
  74. }
  75. }
  76. }
  77. func parseButtonState(raw string) int64 {
  78. raw = strings.ToUpper(raw)
  79. if strings.HasPrefix(raw, "0201060303E1FF12") && len(raw) >= 38 {
  80. buttonField := raw[34:38]
  81. if buttonValue, err := strconv.ParseInt(buttonField, 16, 64); err == nil {
  82. return buttonValue
  83. }
  84. }
  85. if strings.HasPrefix(raw, "02010612FF590") && len(raw) >= 24 {
  86. counterField := raw[22:24]
  87. buttonState, err := strconv.ParseInt(counterField, 16, 64)
  88. if err == nil {
  89. return buttonState
  90. }
  91. }
  92. return 0
  93. }