diff --git a/cmd/bridge/main.go b/cmd/bridge/main.go index ab04557..d212687 100644 --- a/cmd/bridge/main.go +++ b/cmd/bridge/main.go @@ -15,7 +15,7 @@ func main() { defer stop() cfg := config.LoadBridge() - app, err := bridge.New(cfg) + app, err := bridge.New(cfg, ctx) if err != nil { log.Fatalf("bridge: %v", err) } diff --git a/internal/app/bridge/app.go b/internal/app/bridge/app.go index 2e72285..0513723 100644 --- a/internal/app/bridge/app.go +++ b/internal/app/bridge/app.go @@ -31,7 +31,7 @@ type BridgeApp struct { } // New creates a BridgeApp with Kafka readers (apibeacons, alert, mqtt), writer (rawbeacons), and MQTT client. -func New(cfg *config.Config) (*BridgeApp, error) { +func New(cfg *config.Config, ctx context.Context) (*BridgeApp, error) { appState := appcontext.NewAppState() kafkaManager := kafkaclient.InitKafkaManager() @@ -46,7 +46,7 @@ func New(cfg *config.Config) (*BridgeApp, error) { writer := kafkaManager.GetWriter("rawbeacons") mqttClient, err := bridge.NewMQTTClient(cfg, func(m mqtt.Message) { - bridge.HandleMQTTMessage(m.Topic(), m.Payload(), appState, writer) + bridge.HandleMQTTMessage(m.Topic(), m.Payload(), appState, writer, ctx) }) if err != nil { cleanup() diff --git a/internal/pkg/bridge/handler.go b/internal/pkg/bridge/handler.go index b7d1b01..9f586b7 100644 --- a/internal/pkg/bridge/handler.go +++ b/internal/pkg/bridge/handler.go @@ -16,7 +16,7 @@ import ( // HandleMQTTMessage processes an MQTT message: parses JSON array of RawReading or CSV. // For JSON, converts each reading to BeaconAdvertisement and writes to the writer if MAC is in lookup. // Hostname is derived from topic (e.g. "publish_out/gateway1" -> "gateway1"). Safe if topic has no "/". -func HandleMQTTMessage(topic string, payload []byte, appState *appcontext.AppState, writer *kafka.Writer) { +func HandleMQTTMessage(topic string, payload []byte, appState *appcontext.AppState, writer *kafka.Writer, ctx context.Context) { parts := strings.SplitN(topic, "/", 2) hostname := "" if len(parts) >= 2 { @@ -51,7 +51,7 @@ func HandleMQTTMessage(topic string, payload []byte, appState *appcontext.AppSta slog.Error("marshaling beacon advertisement", "err", err) break } - if err := kafkaclient.Write(context.Background(), writer, kafka.Message{Value: encoded}); err != nil { + if err := kafkaclient.Write(ctx, writer, kafka.Message{Value: encoded}); err != nil { slog.Error("writing to Kafka", "err", err) time.Sleep(1 * time.Second) break