You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

108 regels
2.5 KiB

  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "os"
  7. "os/signal"
  8. "strconv"
  9. "strings"
  10. "time"
  11. //"./utils"
  12. "github.com/yosssi/gmq/mqtt"
  13. "github.com/yosssi/gmq/mqtt/client"
  14. )
  15. func main() {
  16. sigc := make(chan os.Signal, 1)
  17. signal.Notify(sigc, os.Interrupt, os.Kill)
  18. incoming_updates_chan := IncomingMQTTProcessor(1*time.Second, cli, db, loggers)
  19. err = cli.Subscribe(&client.SubscribeOptions{
  20. SubReqs: []*client.SubReq{
  21. &client.SubReq{
  22. TopicFilter: []byte("publish_out/#"),
  23. QoS: mqtt.QoS0,
  24. Handler: func(topicName, message []byte) {
  25. msgStr := string(message)
  26. t := strings.Split(string(topicName), "/")
  27. hostname := t[1]
  28. if strings.HasPrefix(msgStr, "[") {
  29. var readings []RawReading
  30. err := json.Unmarshal(message, &readings)
  31. if err != nil {
  32. log.Printf("Errore parsing JSON: %v", err)
  33. return
  34. }
  35. for _, reading := range readings {
  36. if reading.Type == "Gateway" {
  37. continue
  38. }
  39. incoming := Incoming_json{
  40. Hostname: hostname,
  41. MAC: reading.MAC,
  42. RSSI: int64(reading.RSSI),
  43. Data: reading.RawData,
  44. HB_ButtonCounter: parseButtonState(reading.RawData),
  45. }
  46. incoming_updates_chan <- incoming
  47. }
  48. } else {
  49. s := strings.Split(string(message), ",")
  50. if len(s) < 6 {
  51. log.Printf("Messaggio CSV non valido: %s", msgStr)
  52. return
  53. }
  54. rawdata := s[4]
  55. buttonCounter := parseButtonState(rawdata)
  56. if buttonCounter > 0 {
  57. incoming := Incoming_json{}
  58. i, _ := strconv.ParseInt(s[3], 10, 64)
  59. incoming.Hostname = hostname
  60. incoming.Beacon_type = "hb_button"
  61. incoming.MAC = s[1]
  62. incoming.RSSI = i
  63. incoming.Data = rawdata
  64. incoming.HB_ButtonCounter = buttonCounter
  65. read_line := strings.TrimRight(string(s[5]), "\r\n")
  66. it, err33 := strconv.Atoi(read_line)
  67. if err33 != nil {
  68. fmt.Println(it)
  69. fmt.Println(err33)
  70. os.Exit(2)
  71. }
  72. incoming_updates_chan <- incoming
  73. }
  74. }
  75. },
  76. },
  77. },
  78. })
  79. if err != nil {
  80. panic(err)
  81. }
  82. fmt.Println("CONNECTED TO MQTT")
  83. fmt.Println("\n ")
  84. fmt.Println("Visit http://" + *http_host_path_ptr + " on your browser to see the web interface")
  85. fmt.Println("\n ")
  86. go startServer()
  87. // Wait for receiving a signal.
  88. <-sigc
  89. // Disconnect the Network Connection.
  90. if err := cli.Disconnect(); err != nil {
  91. panic(err)
  92. }
  93. }