|
- package main
-
- import (
- "context"
- "encoding/json"
- "fmt"
- "log"
- "log/slog"
- "os/signal"
- "strings"
- "sync"
- "syscall"
- "time"
-
- "github.com/AFASystems/presence/internal/pkg/common/appcontext"
- "github.com/AFASystems/presence/internal/pkg/config"
- "github.com/AFASystems/presence/internal/pkg/kafkaclient"
- "github.com/AFASystems/presence/internal/pkg/logger"
- "github.com/AFASystems/presence/internal/pkg/model"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "github.com/segmentio/kafka-go"
- )
-
- var wg sync.WaitGroup
-
- func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *appcontext.AppState) {
- hostname := strings.Split(topic, "/")[1]
- msgStr := string(message)
-
- if strings.HasPrefix(msgStr, "[") {
- var readings []model.RawReading
- err := json.Unmarshal(message, &readings)
- if err != nil {
- log.Printf("Error parsing JSON: %v", err)
- return
- }
-
- for _, reading := range readings {
- if reading.Type == "Gateway" {
- continue
- }
-
- val, ok := appState.BeaconExists(reading.MAC)
- // fmt.Printf("reading: %+v\n", reading)
- if !ok {
- continue
- }
-
- adv := model.BeaconAdvertisement{
- ID: val,
- Hostname: hostname,
- MAC: reading.MAC,
- RSSI: int64(reading.RSSI),
- Data: reading.RawData,
- }
-
- encodedMsg, err := json.Marshal(adv)
- if err != nil {
- fmt.Println("Error in marshaling: ", err)
- break
- }
-
- msg := kafka.Message{
- Value: encodedMsg,
- }
-
- err = writer.WriteMessages(context.Background(), msg)
- if err != nil {
- fmt.Println("Error in writing to Kafka: ", err)
- time.Sleep(1 * time.Second)
- break
- }
- }
- } else {
- s := strings.Split(string(message), ",")
- if len(s) < 6 {
- log.Printf("Messaggio CSV non valido: %s", msgStr)
- return
- }
-
- fmt.Println("this gateway is also sending data: ", s)
- }
- }
-
- var messagePubHandler = func(msg mqtt.Message, writer *kafka.Writer, appState *appcontext.AppState) {
- mqtthandler(writer, msg.Topic(), msg.Payload(), appState)
- }
-
- var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
- fmt.Println("Connected")
- }
-
- var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
- fmt.Printf("Connect lost: %v", err)
- }
-
- func main() {
- // Load global context to init beacons and latest list
- appState := appcontext.NewAppState()
- cfg := config.Load()
- kafkaManager := kafkaclient.InitKafkaManager()
-
- // Set logger -> terminal and log file
- slog.SetDefault(logger.CreateLogger("bridge.log"))
-
- // define context
- ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
- defer stop()
-
- readerTopics := []string{"apibeacons", "alert", "mqtt"}
- kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics)
-
- writerTopics := []string{"rawbeacons"}
- kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
-
- slog.Info("Bridge initialized, subscribed to kafka topics")
-
- chApi := make(chan model.ApiUpdate, 200)
- chAlert := make(chan model.Alert, 200)
- chMqtt := make(chan []model.Tracker, 200)
-
- wg.Add(3)
- go kafkaclient.Consume(kafkaManager.GetReader("apibeacons"), chApi, ctx, &wg)
- go kafkaclient.Consume(kafkaManager.GetReader("alert"), chAlert, ctx, &wg)
- go kafkaclient.Consume(kafkaManager.GetReader("mqtt"), chMqtt, ctx, &wg)
-
- opts := mqtt.NewClientOptions()
- opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTHost, 1883))
- opts.SetClientID("go_mqtt_client")
- opts.SetAutoReconnect(true)
- opts.SetConnectRetry(true)
- opts.SetConnectRetryInterval(1 * time.Second)
- opts.SetMaxReconnectInterval(600 * time.Second)
- opts.SetCleanSession(false)
-
- opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) {
- messagePubHandler(m, kafkaManager.GetWriter("rawbeacons"), appState)
- })
- opts.OnConnect = connectHandler
- opts.OnConnectionLost = connectLostHandler
- client := mqtt.NewClient(opts)
- if token := client.Connect(); token.Wait() && token.Error() != nil {
- panic(token.Error())
- }
-
- sub(client)
-
- eventloop:
- for {
- select {
- case <-ctx.Done():
- break eventloop
- case msg := <-chApi:
- switch msg.Method {
- case "POST":
- id := msg.ID
- appState.AddBeaconToLookup(msg.MAC, id)
- lMsg := fmt.Sprintf("Beacon added to lookup: %s", id)
- slog.Info(lMsg)
- case "DELETE":
- id := msg.MAC
- if id == "all" {
- appState.CleanLookup()
- fmt.Println("cleaned up lookup map")
- continue
- }
- appState.RemoveBeaconFromLookup(id)
- lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id)
- slog.Info(lMsg)
- }
- case msg := <-chAlert:
- p, err := json.Marshal(msg)
- if err != nil {
- continue
- }
- client.Publish("/alerts", 0, true, p)
- case msg := <-chMqtt:
- p, err := json.Marshal(msg)
- if err != nil {
- continue
- }
- client.Publish("/trackers", 0, true, p)
- }
- }
-
- slog.Info("broken out of the main event loop")
- wg.Wait()
-
- slog.Info("All go routines have stopped, Beggining to close Kafka connections")
- kafkaManager.CleanKafkaReaders()
- kafkaManager.CleanKafkaWriters()
-
- client.Disconnect(250)
- slog.Info("Closing connection to MQTT broker")
- }
-
- func sub(client mqtt.Client) {
- topic := "publish_out/#"
- token := client.Subscribe(topic, 1, nil)
- token.Wait()
- fmt.Printf("Subscribed to topic: %s\n", topic)
- }
|