package main import ( "context" "encoding/json" "fmt" "io" "log" "log/slog" "os" "os/signal" "strings" "sync" "syscall" "time" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/config" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/segmentio/kafka-go" ) var wg sync.WaitGroup func mqtthandler(writer *kafka.Writer, topic string, message []byte, appState *appcontext.AppState) { hostname := strings.Split(topic, "/")[1] msgStr := string(message) if strings.HasPrefix(msgStr, "[") { var readings []model.RawReading err := json.Unmarshal(message, &readings) if err != nil { log.Printf("Error parsing JSON: %v", err) return } for _, reading := range readings { if reading.Type == "Gateway" { continue } val, ok := appState.BeaconExists(reading.MAC) // fmt.Printf("reading: %+v\n", reading) if !ok { continue } adv := model.BeaconAdvertisement{ ID: val, Hostname: hostname, MAC: reading.MAC, RSSI: int64(reading.RSSI), Data: reading.RawData, } encodedMsg, err := json.Marshal(adv) if err != nil { fmt.Println("Error in marshaling: ", err) break } msg := kafka.Message{ Value: encodedMsg, } err = writer.WriteMessages(context.Background(), msg) if err != nil { fmt.Println("Error in writing to Kafka: ", err) time.Sleep(1 * time.Second) break } } } // } else { // s := strings.Split(string(message), ",") // if len(s) < 6 { // log.Printf("Messaggio CSV non valido: %s", msgStr) // return // } // rawdata := s[4] // buttonCounter := parseButtonState(rawdata) // if buttonCounter > 0 { // adv := model.BeaconAdvertisement{} // i, _ := strconv.ParseInt(s[3], 10, 64) // adv.Hostname = hostname // adv.BeaconType = "hb_button" // adv.MAC = s[1] // adv.RSSI = i // adv.Data = rawdata // adv.HSButtonCounter = buttonCounter // read_line := strings.TrimRight(string(s[5]), "\r\n") // it, err33 := strconv.Atoi(read_line) // if err33 != nil { // fmt.Println(it) // fmt.Println(err33) // os.Exit(2) // } // } // } } var messagePubHandler = func(msg mqtt.Message, writer *kafka.Writer, appState *appcontext.AppState) { mqtthandler(writer, msg.Topic(), msg.Payload(), appState) } var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { fmt.Println("Connected") } var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { fmt.Printf("Connect lost: %v", err) } func main() { // Load global context to init beacons and latest list appState := appcontext.NewAppState() cfg := config.Load() // Create log file -> this section and below can be moved in a package, as it is always the same logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { log.Fatalf("Failed to open log file: %v\n", err) } // shell and log file multiwriter w := io.MultiWriter(os.Stderr, logFile) logger := slog.New(slog.NewJSONHandler(w, nil)) slog.SetDefault(logger) // define context ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer stop() // define kafka readers apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "bridge-api") alertReader := appState.AddKafkaReader(cfg.KafkaURL, "alert", "bridge-alert") // define kafka writer writer := appState.AddKafkaWriter(cfg.KafkaURL, "rawbeacons") slog.Info("Bridge initialized, subscribed to kafka topics") chApi := make(chan model.ApiUpdate, 200) chAlert := make(chan model.Alert, 200) wg.Add(2) go kafkaclient.Consume(apiReader, chApi, ctx, &wg) go kafkaclient.Consume(alertReader, chAlert, ctx, &wg) opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTHost, 1883)) opts.SetClientID("go_mqtt_client") opts.SetAutoReconnect(true) opts.SetConnectRetry(true) opts.SetConnectRetryInterval(1 * time.Second) opts.SetMaxReconnectInterval(600 * time.Second) opts.SetCleanSession(false) opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { messagePubHandler(m, writer, appState) }) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } sub(client) eventloop: for { select { case <-ctx.Done(): break eventloop case msg := <-chApi: switch msg.Method { case "POST": id := msg.ID appState.AddBeaconToLookup(msg.MAC, id) lMsg := fmt.Sprintf("Beacon added to lookup: %s", id) slog.Info(lMsg) case "DELETE": id := msg.MAC if id == "all" { appState.CleanLookup() fmt.Println("cleaned up lookup map") continue } appState.RemoveBeaconFromLookup(id) lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id) slog.Info(lMsg) } case msg := <-chAlert: fmt.Printf("Alerts: %+v\n", msg) p, err := json.Marshal(msg) if err != nil { continue } client.Publish("/alerts", 0, true, p) } } slog.Info("broken out of the main event loop") wg.Wait() slog.Info("All go routines have stopped, Beggining to close Kafka connections") appState.CleanKafkaReaders() appState.CleanKafkaWriters() client.Disconnect(250) slog.Info("Closing connection to MQTT broker") } func sub(client mqtt.Client) { topic := "publish_out/#" token := client.Subscribe(topic, 1, nil) token.Wait() fmt.Printf("Subscribed to topic: %s\n", topic) }