From 5fd72d81513fff46529e4d7a1055976edb8500eb Mon Sep 17 00:00:00 2001 From: blazSmehov Date: Tue, 10 Mar 2026 12:48:18 +0100 Subject: [PATCH] feat: add new topics, readers and writers --- build/init-scripts/create_topic.sh | 15 ++++++++++++++ internal/app/bridge/app.go | 2 +- internal/app/decoder/app.go | 2 +- internal/app/location/app.go | 2 +- internal/app/server/app.go | 33 +++++++++++++++++++----------- 5 files changed, 39 insertions(+), 15 deletions(-) diff --git a/build/init-scripts/create_topic.sh b/build/init-scripts/create_topic.sh index d197a57..1dd2534 100755 --- a/build/init-scripts/create_topic.sh +++ b/build/init-scripts/create_topic.sh @@ -28,4 +28,19 @@ # create topic alert /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ --create --if-not-exists --topic alert \ +--partitions 1 --replication-factor 1 + +# create topic healthlocation +/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ +--create --if-not-exists --topic healthlocation \ +--partitions 1 --replication-factor 1 + +# create topic healthdecoder +/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ +--create --if-not-exists --topic healthdecoder \ +--partitions 1 --replication-factor 1 + + # create topic healthbridge +/opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ +--create --if-not-exists --topic healthbridge \ --partitions 1 --replication-factor 1 \ No newline at end of file diff --git a/internal/app/bridge/app.go b/internal/app/bridge/app.go index 13f3e33..5cc7f39 100644 --- a/internal/app/bridge/app.go +++ b/internal/app/bridge/app.go @@ -37,7 +37,7 @@ func New(cfg *config.Config) (*BridgeApp, error) { slog.SetDefault(srvLogger) readerTopics := []string{"apibeacons", "alert", "mqtt"} - writerTopics := []string{"rawbeacons"} + writerTopics := []string{"rawbeacons", "healthbridge"} kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics) kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) slog.Info("bridge service initialized", "readers", readerTopics, "writers", writerTopics) diff --git a/internal/app/decoder/app.go b/internal/app/decoder/app.go index 337cf53..65d694e 100644 --- a/internal/app/decoder/app.go +++ b/internal/app/decoder/app.go @@ -34,7 +34,7 @@ func New(cfg *config.Config) (*DecoderApp, error) { slog.SetDefault(srvLogger) readerTopics := []string{"rawbeacons", "parser"} - writerTopics := []string{"alertbeacons"} + writerTopics := []string{"alertbeacons", "healthdecoder"} kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "decoder", readerTopics) kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) slog.Info("decoder service initialized", "readers", readerTopics, "writers", writerTopics) diff --git a/internal/app/location/app.go b/internal/app/location/app.go index c0c0ddc..472b2ea 100644 --- a/internal/app/location/app.go +++ b/internal/app/location/app.go @@ -37,7 +37,7 @@ func New(cfg *config.Config) (*LocationApp, error) { slog.SetDefault(srvLogger) readerTopics := []string{"rawbeacons", "settings"} - writerTopics := []string{"locevents"} + writerTopics := []string{"locevents", "healthlocation"} kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "location", readerTopics) kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) slog.Info("location service initialized", "readers", readerTopics, "writers", writerTopics) diff --git a/internal/app/server/app.go b/internal/app/server/app.go index 4e438a5..821744f 100644 --- a/internal/app/server/app.go +++ b/internal/app/server/app.go @@ -23,16 +23,19 @@ import ( // ServerApp holds dependencies and state for the server service. type ServerApp struct { - Cfg *config.Config - DB *gorm.DB - KafkaManager *kafkaclient.KafkaManager - AppState *appcontext.AppState - ChLoc chan model.HTTPLocation - ChEvents chan appcontext.BeaconEvent - ctx context.Context - Server *http.Server - Cleanup func() - wg sync.WaitGroup + Cfg *config.Config + DB *gorm.DB + KafkaManager *kafkaclient.KafkaManager + AppState *appcontext.AppState + ChLoc chan model.HTTPLocation + ChEvents chan appcontext.BeaconEvent + ChHealthLocation chan appcontext.LocationHealth + ChHealthDecoder chan appcontext.DecoderHealth + ChHealthBridge chan appcontext.BridgeHealth + ctx context.Context + Server *http.Server + Cleanup func() + wg sync.WaitGroup } // New creates a ServerApp: loads config, creates logger, connects DB, creates Kafka manager and writers. @@ -98,16 +101,22 @@ func (a *ServerApp) Init(ctx context.Context) error { slog.Error("UpdateDB", "err", err) } - readerTopics := []string{"locevents", "alertbeacons", "health"} + readerTopics := []string{"locevents", "alertbeacons", "healthlocation", "healthdecoder", "healthbridge"} a.KafkaManager.PopulateKafkaManager(a.Cfg.KafkaURL, "server", readerTopics) slog.Info("Kafka readers initialized", "topics", readerTopics) a.ChLoc = make(chan model.HTTPLocation, config.SMALL_CHANNEL_SIZE) a.ChEvents = make(chan appcontext.BeaconEvent, config.MEDIUM_CHANNEL_SIZE) + a.ChHealthLocation = make(chan appcontext.LocationHealth, config.SMALL_CHANNEL_SIZE) + a.ChHealthDecoder = make(chan appcontext.DecoderHealth, config.SMALL_CHANNEL_SIZE) + a.ChHealthBridge = make(chan appcontext.BridgeHealth, config.SMALL_CHANNEL_SIZE) - a.wg.Add(2) + a.wg.Add(5) go kafkaclient.Consume(a.KafkaManager.GetReader("locevents"), a.ChLoc, ctx, &a.wg) go kafkaclient.Consume(a.KafkaManager.GetReader("alertbeacons"), a.ChEvents, ctx, &a.wg) + go kafkaclient.Consume(a.KafkaManager.GetReader("healthlocation"), a.ChHealthLocation, ctx, &a.wg) + go kafkaclient.Consume(a.KafkaManager.GetReader("healthdecoder"), a.ChHealthDecoder, ctx, &a.wg) + go kafkaclient.Consume(a.KafkaManager.GetReader("healthbridge"), a.ChHealthBridge, ctx, &a.wg) a.Server = &http.Server{ Addr: a.Cfg.HTTPAddr,