| @@ -28,4 +28,19 @@ | |||||
| # create topic alert | # create topic alert | ||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | ||||
| --create --if-not-exists --topic alert \ | --create --if-not-exists --topic alert \ | ||||
| --partitions 1 --replication-factor 1 | |||||
| # create topic healthlocation | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||||
| --create --if-not-exists --topic healthlocation \ | |||||
| --partitions 1 --replication-factor 1 | |||||
| # create topic healthdecoder | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||||
| --create --if-not-exists --topic healthdecoder \ | |||||
| --partitions 1 --replication-factor 1 | |||||
| # create topic healthbridge | |||||
| /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka:29092 \ | |||||
| --create --if-not-exists --topic healthbridge \ | |||||
| --partitions 1 --replication-factor 1 | --partitions 1 --replication-factor 1 | ||||
| @@ -37,7 +37,7 @@ func New(cfg *config.Config) (*BridgeApp, error) { | |||||
| slog.SetDefault(srvLogger) | slog.SetDefault(srvLogger) | ||||
| readerTopics := []string{"apibeacons", "alert", "mqtt"} | readerTopics := []string{"apibeacons", "alert", "mqtt"} | ||||
| writerTopics := []string{"rawbeacons"} | |||||
| writerTopics := []string{"rawbeacons", "healthbridge"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "bridge", readerTopics) | ||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | ||||
| slog.Info("bridge service initialized", "readers", readerTopics, "writers", writerTopics) | slog.Info("bridge service initialized", "readers", readerTopics, "writers", writerTopics) | ||||
| @@ -34,7 +34,7 @@ func New(cfg *config.Config) (*DecoderApp, error) { | |||||
| slog.SetDefault(srvLogger) | slog.SetDefault(srvLogger) | ||||
| readerTopics := []string{"rawbeacons", "parser"} | readerTopics := []string{"rawbeacons", "parser"} | ||||
| writerTopics := []string{"alertbeacons"} | |||||
| writerTopics := []string{"alertbeacons", "healthdecoder"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "decoder", readerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "decoder", readerTopics) | ||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | ||||
| slog.Info("decoder service initialized", "readers", readerTopics, "writers", writerTopics) | slog.Info("decoder service initialized", "readers", readerTopics, "writers", writerTopics) | ||||
| @@ -37,7 +37,7 @@ func New(cfg *config.Config) (*LocationApp, error) { | |||||
| slog.SetDefault(srvLogger) | slog.SetDefault(srvLogger) | ||||
| readerTopics := []string{"rawbeacons", "settings"} | readerTopics := []string{"rawbeacons", "settings"} | ||||
| writerTopics := []string{"locevents"} | |||||
| writerTopics := []string{"locevents", "healthlocation"} | |||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "location", readerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "location", readerTopics) | ||||
| kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics) | ||||
| slog.Info("location service initialized", "readers", readerTopics, "writers", writerTopics) | slog.Info("location service initialized", "readers", readerTopics, "writers", writerTopics) | ||||
| @@ -23,16 +23,19 @@ import ( | |||||
| // ServerApp holds dependencies and state for the server service. | // ServerApp holds dependencies and state for the server service. | ||||
| type ServerApp struct { | type ServerApp struct { | ||||
| Cfg *config.Config | |||||
| DB *gorm.DB | |||||
| KafkaManager *kafkaclient.KafkaManager | |||||
| AppState *appcontext.AppState | |||||
| ChLoc chan model.HTTPLocation | |||||
| ChEvents chan appcontext.BeaconEvent | |||||
| ctx context.Context | |||||
| Server *http.Server | |||||
| Cleanup func() | |||||
| wg sync.WaitGroup | |||||
| Cfg *config.Config | |||||
| DB *gorm.DB | |||||
| KafkaManager *kafkaclient.KafkaManager | |||||
| AppState *appcontext.AppState | |||||
| ChLoc chan model.HTTPLocation | |||||
| ChEvents chan appcontext.BeaconEvent | |||||
| ChHealthLocation chan appcontext.LocationHealth | |||||
| ChHealthDecoder chan appcontext.DecoderHealth | |||||
| ChHealthBridge chan appcontext.BridgeHealth | |||||
| ctx context.Context | |||||
| Server *http.Server | |||||
| Cleanup func() | |||||
| wg sync.WaitGroup | |||||
| } | } | ||||
| // New creates a ServerApp: loads config, creates logger, connects DB, creates Kafka manager and writers. | // New creates a ServerApp: loads config, creates logger, connects DB, creates Kafka manager and writers. | ||||
| @@ -98,16 +101,22 @@ func (a *ServerApp) Init(ctx context.Context) error { | |||||
| slog.Error("UpdateDB", "err", err) | slog.Error("UpdateDB", "err", err) | ||||
| } | } | ||||
| readerTopics := []string{"locevents", "alertbeacons", "health"} | |||||
| readerTopics := []string{"locevents", "alertbeacons", "healthlocation", "healthdecoder", "healthbridge"} | |||||
| a.KafkaManager.PopulateKafkaManager(a.Cfg.KafkaURL, "server", readerTopics) | a.KafkaManager.PopulateKafkaManager(a.Cfg.KafkaURL, "server", readerTopics) | ||||
| slog.Info("Kafka readers initialized", "topics", readerTopics) | slog.Info("Kafka readers initialized", "topics", readerTopics) | ||||
| a.ChLoc = make(chan model.HTTPLocation, config.SMALL_CHANNEL_SIZE) | a.ChLoc = make(chan model.HTTPLocation, config.SMALL_CHANNEL_SIZE) | ||||
| a.ChEvents = make(chan appcontext.BeaconEvent, config.MEDIUM_CHANNEL_SIZE) | a.ChEvents = make(chan appcontext.BeaconEvent, config.MEDIUM_CHANNEL_SIZE) | ||||
| a.ChHealthLocation = make(chan appcontext.LocationHealth, config.SMALL_CHANNEL_SIZE) | |||||
| a.ChHealthDecoder = make(chan appcontext.DecoderHealth, config.SMALL_CHANNEL_SIZE) | |||||
| a.ChHealthBridge = make(chan appcontext.BridgeHealth, config.SMALL_CHANNEL_SIZE) | |||||
| a.wg.Add(2) | |||||
| a.wg.Add(5) | |||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("locevents"), a.ChLoc, ctx, &a.wg) | go kafkaclient.Consume(a.KafkaManager.GetReader("locevents"), a.ChLoc, ctx, &a.wg) | ||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("alertbeacons"), a.ChEvents, ctx, &a.wg) | go kafkaclient.Consume(a.KafkaManager.GetReader("alertbeacons"), a.ChEvents, ctx, &a.wg) | ||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("healthlocation"), a.ChHealthLocation, ctx, &a.wg) | |||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("healthdecoder"), a.ChHealthDecoder, ctx, &a.wg) | |||||
| go kafkaclient.Consume(a.KafkaManager.GetReader("healthbridge"), a.ChHealthBridge, ctx, &a.wg) | |||||
| a.Server = &http.Server{ | a.Server = &http.Server{ | ||||
| Addr: a.Cfg.HTTPAddr, | Addr: a.Cfg.HTTPAddr, | ||||