From 1afa01b5c5b025616dc3829dfe98e26527480354 Mon Sep 17 00:00:00 2001 From: blazSmehov Date: Wed, 31 Dec 2025 13:08:02 +0100 Subject: [PATCH] feat: add alerts topic, publish alert messages to MQTT broker from the bridge --- build/init-scripts/create_topic.sh | 5 ++++ cmd/bridge/main.go | 33 +++++++++++++++----------- cmd/server/main.go | 3 ++- internal/pkg/model/types.go | 6 +++++ internal/pkg/service/beacon_service.go | 22 +++++++++++++---- 5 files changed, 50 insertions(+), 19 deletions(-) diff --git a/build/init-scripts/create_topic.sh b/build/init-scripts/create_topic.sh index bc764a1..d197a57 100755 --- a/build/init-scripts/create_topic.sh +++ b/build/init-scripts/create_topic.sh @@ -23,4 +23,9 @@ # create topic settings /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ --create --if-not-exists --topic settings \ +--partitions 1 --replication-factor 1 + +# create topic alert +/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ +--create --if-not-exists --topic alert \ --partitions 1 --replication-factor 1 \ No newline at end of file diff --git a/cmd/bridge/main.go b/cmd/bridge/main.go index 4bc654e..216d893 100644 --- a/cmd/bridge/main.go +++ b/cmd/bridge/main.go @@ -134,23 +134,31 @@ func main() { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) defer stop() - // define kafka reader + // define kafka readers apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "bridge-api") + alertReader := appState.AddKafkaReader(cfg.KafkaURL, "alert", "bridge-alert") + // define kafka writer writer := appState.AddKafkaWriter(cfg.KafkaURL, "rawbeacons") slog.Info("Bridge initialized, subscribed to kafka topics") chApi := make(chan model.ApiUpdate, 200) + chAlert := make(chan model.Alert, 200) - wg.Add(1) + wg.Add(2) go kafkaclient.Consume(apiReader, chApi, ctx, &wg) + go kafkaclient.Consume(alertReader, chAlert, ctx, &wg) opts := mqtt.NewClientOptions() opts.AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTHost, 1883)) opts.SetClientID("go_mqtt_client") - opts.SetUsername("emqx") - opts.SetPassword("public") + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectRetryInterval(1 * time.Second) + opts.SetMaxReconnectInterval(600 * time.Second) + opts.SetCleanSession(false) + opts.SetDefaultPublishHandler(func(c mqtt.Client, m mqtt.Message) { messagePubHandler(m, writer, appState) }) opts.OnConnect = connectHandler opts.OnConnectionLost = connectLostHandler @@ -184,6 +192,13 @@ eventloop: lMsg := fmt.Sprintf("Beacon removed from lookup: %s", id) slog.Info(lMsg) } + case msg := <-chAlert: + fmt.Printf("Alerts: %+v\n", msg) + p, err := json.Marshal(msg) + if err != nil { + continue + } + client.Publish("/alerts", 0, true, p) } } @@ -198,16 +213,6 @@ eventloop: slog.Info("Closing connection to MQTT broker") } -func publish(client mqtt.Client) { - num := 10 - for i := 0; i < num; i++ { - text := fmt.Sprintf("Message %d", i) - token := client.Publish("topic/test", 0, false, text) - token.Wait() - time.Sleep(time.Second) - } -} - func sub(client mqtt.Client) { topic := "publish_out/#" token := client.Subscribe(topic, 1, nil) diff --git a/cmd/server/main.go b/cmd/server/main.go index c3b474d..b74d5e4 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -65,6 +65,7 @@ func main() { writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") + alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alert") slog.Info("Kafka writers topics: apibeacons, settings initialized") if err := apiclient.UpdateDB(db, ctx, cfg, writer); err != nil { @@ -131,7 +132,7 @@ eventLoop: case <-ctx.Done(): break eventLoop case msg := <-chLoc: - service.LocationToBeaconService(msg, db) + service.LocationToBeaconService(msg, db, alertWriter, ctx) case msg := <-chEvents: fmt.Printf("event: %+v\n", msg) id := msg.ID diff --git a/internal/pkg/model/types.go b/internal/pkg/model/types.go index ae0a1c2..9fbae9a 100644 --- a/internal/pkg/model/types.go +++ b/internal/pkg/model/types.go @@ -183,3 +183,9 @@ type KafkaWritersList struct { KafkaWritersLock sync.RWMutex KafkaWriters []*kafka.Writer } + +type Alert struct { + ID string `json:"id"` // tracker id + Type string `json:"type"` // type of alert + Value string `json:"value"` // possible value +} diff --git a/internal/pkg/service/beacon_service.go b/internal/pkg/service/beacon_service.go index ea23b0d..7cf732a 100644 --- a/internal/pkg/service/beacon_service.go +++ b/internal/pkg/service/beacon_service.go @@ -2,22 +2,22 @@ package service import ( "context" + "encoding/json" "fmt" "slices" "strings" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/model" + "github.com/segmentio/kafka-go" "gorm.io/gorm" ) -func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB) { +func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB, writer *kafka.Writer, ctx context.Context) { if msg.ID == "" { return } - fmt.Println("msg id: ", msg.ID) - var zones []model.TrackerZones if err := db.Select("zoneList").Where("tracker = ?", msg.ID).Find(&zones).Error; err != nil { return @@ -38,7 +38,21 @@ func LocationToBeaconService(msg model.HTTPLocation, db *gorm.DB) { } if len(allowedZones) != 0 && !slices.Contains(allowedZones, gw.ID) { - fmt.Println("Alert") + alert := model.Alert{ + ID: msg.ID, + Type: "Restricted zone", + Value: gw.ID, + } + + eMsg, err := json.Marshal(alert) + if err != nil { + fmt.Println("Error in marshaling") + } else { + msg := kafka.Message{ + Value: eMsg, + } + writer.WriteMessages(ctx, msg) + } } if err := db.Updates(&model.Tracker{ID: msg.ID, Location: gw.ID, Distance: msg.Distance}).Error; err != nil {