您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

58 行
1.2 KiB

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/AFASystems/presence/internal/pkg/bridge/mqtthandler"
  5. "github.com/AFASystems/presence/internal/pkg/config"
  6. "github.com/AFASystems/presence/internal/pkg/kafka"
  7. "github.com/yosssi/gmq/mqtt"
  8. "github.com/yosssi/gmq/mqtt/client"
  9. )
  10. func main() {
  11. cfg := config.Load()
  12. cli := client.New(&client.Options{
  13. ErrorHandler: func(err error) {
  14. fmt.Println("Error in initiating MQTT client: ", err)
  15. },
  16. })
  17. defer cli.Terminate()
  18. err := cli.Connect(&client.ConnectOptions{
  19. Network: "tcp",
  20. Address: cfg.MQTTHost,
  21. ClientID: []byte(cfg.MQTTClientID),
  22. UserName: []byte(cfg.MQTTUser),
  23. Password: []byte(cfg.MQTTPass),
  24. })
  25. if err != nil {
  26. fmt.Println("Could not connect to MQTT broker")
  27. panic(err)
  28. }
  29. fmt.Println("Successfuly connected to MQTT broker")
  30. writer := kafka.KafkaWriter(cfg.KafkaURL, "rawbeacons")
  31. defer writer.Close()
  32. err = cli.Subscribe(&client.SubscribeOptions{
  33. SubReqs: []*client.SubReq{
  34. {
  35. TopicFilter: []byte("publish_out/#"),
  36. QoS: mqtt.QoS0,
  37. Handler: func(topicName, message []byte) {
  38. mqtthandler.MqttHandler(writer, topicName, message)
  39. },
  40. },
  41. },
  42. })
  43. if err != nil {
  44. panic(err)
  45. }
  46. select {}
  47. }