| @@ -15,7 +15,7 @@ func main() { | |||||
| defer stop() | defer stop() | ||||
| cfg := config.LoadBridge() | cfg := config.LoadBridge() | ||||
| app, err := bridge.New(cfg) | |||||
| app, err := bridge.New(cfg, ctx) | |||||
| if err != nil { | if err != nil { | ||||
| log.Fatalf("bridge: %v", err) | log.Fatalf("bridge: %v", err) | ||||
| } | } | ||||
| @@ -31,7 +31,7 @@ type BridgeApp struct { | |||||
| } | } | ||||
| // New creates a BridgeApp with Kafka readers (apibeacons, alert, mqtt), writer (rawbeacons), and MQTT client. | // New creates a BridgeApp with Kafka readers (apibeacons, alert, mqtt), writer (rawbeacons), and MQTT client. | ||||
| func New(cfg *config.Config) (*BridgeApp, error) { | |||||
| func New(cfg *config.Config, ctx context.Context) (*BridgeApp, error) { | |||||
| appState := appcontext.NewAppState() | appState := appcontext.NewAppState() | ||||
| kafkaManager := kafkaclient.InitKafkaManager() | kafkaManager := kafkaclient.InitKafkaManager() | ||||
| @@ -46,7 +46,7 @@ func New(cfg *config.Config) (*BridgeApp, error) { | |||||
| writer := kafkaManager.GetWriter("rawbeacons") | writer := kafkaManager.GetWriter("rawbeacons") | ||||
| mqttClient, err := bridge.NewMQTTClient(cfg, func(m mqtt.Message) { | mqttClient, err := bridge.NewMQTTClient(cfg, func(m mqtt.Message) { | ||||
| bridge.HandleMQTTMessage(m.Topic(), m.Payload(), appState, writer) | |||||
| bridge.HandleMQTTMessage(m.Topic(), m.Payload(), appState, writer, ctx) | |||||
| }) | }) | ||||
| if err != nil { | if err != nil { | ||||
| cleanup() | cleanup() | ||||
| @@ -5,7 +5,6 @@ import ( | |||||
| "encoding/json" | "encoding/json" | ||||
| "log/slog" | "log/slog" | ||||
| "strings" | "strings" | ||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/common/appcontext" | "github.com/AFASystems/presence/internal/pkg/common/appcontext" | ||||
| "github.com/AFASystems/presence/internal/pkg/kafkaclient" | "github.com/AFASystems/presence/internal/pkg/kafkaclient" | ||||
| @@ -16,7 +15,7 @@ import ( | |||||
| // HandleMQTTMessage processes an MQTT message: parses JSON array of RawReading or CSV. | // HandleMQTTMessage processes an MQTT message: parses JSON array of RawReading or CSV. | ||||
| // For JSON, converts each reading to BeaconAdvertisement and writes to the writer if MAC is in lookup. | // For JSON, converts each reading to BeaconAdvertisement and writes to the writer if MAC is in lookup. | ||||
| // Hostname is derived from topic (e.g. "publish_out/gateway1" -> "gateway1"). Safe if topic has no "/". | // Hostname is derived from topic (e.g. "publish_out/gateway1" -> "gateway1"). Safe if topic has no "/". | ||||
| func HandleMQTTMessage(topic string, payload []byte, appState *appcontext.AppState, writer *kafka.Writer) { | |||||
| func HandleMQTTMessage(topic string, payload []byte, appState *appcontext.AppState, writer *kafka.Writer, ctx context.Context) { | |||||
| parts := strings.SplitN(topic, "/", 2) | parts := strings.SplitN(topic, "/", 2) | ||||
| hostname := "" | hostname := "" | ||||
| if len(parts) >= 2 { | if len(parts) >= 2 { | ||||
| @@ -51,10 +50,9 @@ func HandleMQTTMessage(topic string, payload []byte, appState *appcontext.AppSta | |||||
| slog.Error("marshaling beacon advertisement", "err", err) | slog.Error("marshaling beacon advertisement", "err", err) | ||||
| break | break | ||||
| } | } | ||||
| if err := kafkaclient.Write(context.Background(), writer, kafka.Message{Value: encoded}); err != nil { | |||||
| if err := kafkaclient.Write(ctx, writer, kafka.Message{Value: encoded}); err != nil { | |||||
| slog.Error("writing to Kafka", "err", err) | slog.Error("writing to Kafka", "err", err) | ||||
| time.Sleep(1 * time.Second) | |||||
| break | |||||
| continue | |||||
| } | } | ||||
| } | } | ||||
| return | return | ||||
| @@ -59,15 +59,14 @@ func NewAppState() *AppState { | |||||
| Uptime: 0, | Uptime: 0, | ||||
| ActiveReaders: []string{}, | ActiveReaders: []string{}, | ||||
| ActiveWriters: []string{}, | ActiveWriters: []string{}, | ||||
| ActiveBeacons: []string{}, | |||||
| }, | }, | ||||
| ActiveBeacons: []string{}, | |||||
| }, | }, | ||||
| Decoder: DecoderHealth{ | Decoder: DecoderHealth{ | ||||
| BaseHealth: BaseHealth{ | BaseHealth: BaseHealth{ | ||||
| Uptime: 0, | Uptime: 0, | ||||
| ActiveReaders: []string{}, | ActiveReaders: []string{}, | ||||
| ActiveWriters: []string{}, | ActiveWriters: []string{}, | ||||
| ActiveBeacons: []string{}, | |||||
| }, | }, | ||||
| }, | }, | ||||
| Bridge: BridgeHealth{ | Bridge: BridgeHealth{ | ||||
| @@ -75,8 +74,8 @@ func NewAppState() *AppState { | |||||
| Uptime: 0, | Uptime: 0, | ||||
| ActiveReaders: []string{}, | ActiveReaders: []string{}, | ||||
| ActiveWriters: []string{}, | ActiveWriters: []string{}, | ||||
| ActiveBeacons: []string{}, | |||||
| }, | }, | ||||
| ActiveBeacons: []string{}, | |||||
| }, | }, | ||||
| Kafka: ServiceStatus{Status: "unknown"}, | Kafka: ServiceStatus{Status: "unknown"}, | ||||
| Database: ServiceStatus{Status: "unknown"}, | Database: ServiceStatus{Status: "unknown"}, | ||||
| @@ -117,10 +116,14 @@ func (m *AppState) UpdateServerHealth(kafka, database ServiceStatus) { | |||||
| } | } | ||||
| func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) ([]byte, error) { | func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) ([]byte, error) { | ||||
| beacons := m.GetAllBeacons() | |||||
| m.health.Location.ActiveBeacons = make([]string, 0, len(beacons)) | |||||
| for beacon := range beacons { | |||||
| m.health.Location.ActiveBeacons = append(m.health.Location.ActiveBeacons, beacon) | |||||
| } | |||||
| m.health.Location.GetUptime(m.startTime) | m.health.Location.GetUptime(m.startTime) | ||||
| m.health.Location.GetActiveReaders(k) | m.health.Location.GetActiveReaders(k) | ||||
| m.health.Location.GetActiveWriters(k) | m.health.Location.GetActiveWriters(k) | ||||
| m.health.Location.GetActiveBeacons(m) | |||||
| return m.health.Location.Marshal() | return m.health.Location.Marshal() | ||||
| } | } | ||||
| @@ -128,15 +131,18 @@ func (m *AppState) GetDecoderHealth(k *kafkaclient.KafkaManager) ([]byte, error) | |||||
| m.health.Decoder.GetUptime(m.startTime) | m.health.Decoder.GetUptime(m.startTime) | ||||
| m.health.Decoder.GetActiveReaders(k) | m.health.Decoder.GetActiveReaders(k) | ||||
| m.health.Decoder.GetActiveWriters(k) | m.health.Decoder.GetActiveWriters(k) | ||||
| m.health.Decoder.GetActiveBeacons(m) | |||||
| return m.health.Decoder.Marshal() | return m.health.Decoder.Marshal() | ||||
| } | } | ||||
| func (m *AppState) GetBridgeHealth(k *kafkaclient.KafkaManager) ([]byte, error) { | func (m *AppState) GetBridgeHealth(k *kafkaclient.KafkaManager) ([]byte, error) { | ||||
| beacons := m.GetBeaconLookup() | |||||
| m.health.Bridge.ActiveBeacons = make([]string, 0, len(beacons)) | |||||
| for beacon := range beacons { | |||||
| m.health.Bridge.ActiveBeacons = append(m.health.Bridge.ActiveBeacons, beacon) | |||||
| } | |||||
| m.health.Bridge.GetUptime(m.startTime) | m.health.Bridge.GetUptime(m.startTime) | ||||
| m.health.Bridge.GetActiveReaders(k) | m.health.Bridge.GetActiveReaders(k) | ||||
| m.health.Bridge.GetActiveWriters(k) | m.health.Bridge.GetActiveWriters(k) | ||||
| m.health.Bridge.GetActiveBeacons(m) | |||||
| return m.health.Bridge.Marshal() | return m.health.Bridge.Marshal() | ||||
| } | } | ||||
| @@ -179,6 +185,16 @@ func (m *AppState) CleanLookup() { | |||||
| m.beaconsLookup.Lock.Unlock() | m.beaconsLookup.Lock.Unlock() | ||||
| } | } | ||||
| func (m *AppState) GetBeaconLookup() map[string]string { | |||||
| m.beaconsLookup.Lock.RLock() | |||||
| defer m.beaconsLookup.Lock.RUnlock() | |||||
| cp := make(map[string]string, len(m.beaconsLookup.Lookup)) | |||||
| for k, v := range m.beaconsLookup.Lookup { | |||||
| cp[k] = v | |||||
| } | |||||
| return cp | |||||
| } | |||||
| // BeaconExists checks if a beacon exists in the lookup | // BeaconExists checks if a beacon exists in the lookup | ||||
| func (m *AppState) BeaconExists(id string) (string, bool) { | func (m *AppState) BeaconExists(id string) (string, bool) { | ||||
| m.beaconsLookup.Lock.RLock() | m.beaconsLookup.Lock.RLock() | ||||
| @@ -17,7 +17,6 @@ type BaseHealth struct { | |||||
| Uptime time.Duration `json:"uptime"` | Uptime time.Duration `json:"uptime"` | ||||
| ActiveReaders []string `json:"activeReaders" gorm:"type:jsonb"` | ActiveReaders []string `json:"activeReaders" gorm:"type:jsonb"` | ||||
| ActiveWriters []string `json:"activeWriters" gorm:"type:jsonb"` | ActiveWriters []string `json:"activeWriters" gorm:"type:jsonb"` | ||||
| ActiveBeacons []string `json:"activeBeacons" gorm:"type:jsonb"` | |||||
| } | } | ||||
| type DecoderHealth struct { | type DecoderHealth struct { | ||||
| @@ -27,11 +26,13 @@ type DecoderHealth struct { | |||||
| type LocationHealth struct { | type LocationHealth struct { | ||||
| BaseHealth | BaseHealth | ||||
| ActiveBeacons []string `json:"activeBeacons" gorm:"type:jsonb"` | |||||
| ActiveSettings []Settings `json:"activeSettings" gorm:"type:jsonb"` | ActiveSettings []Settings `json:"activeSettings" gorm:"type:jsonb"` | ||||
| } | } | ||||
| type BridgeHealth struct { | type BridgeHealth struct { | ||||
| BaseHealth | BaseHealth | ||||
| ActiveBeacons []string `json:"activeBeacons" gorm:"type:jsonb"` | |||||
| } | } | ||||
| type Health struct { | type Health struct { | ||||
| @@ -54,14 +55,6 @@ func (b *BaseHealth) GetActiveWriters(m *kafkaclient.KafkaManager) { | |||||
| b.ActiveWriters = m.GetWriters() | b.ActiveWriters = m.GetWriters() | ||||
| } | } | ||||
| func (b *BaseHealth) GetActiveBeacons(m *AppState) { | |||||
| beacons := m.GetAllBeacons() | |||||
| b.ActiveBeacons = []string{} | |||||
| for beacon := range beacons { | |||||
| b.ActiveBeacons = append(b.ActiveBeacons, beacon) | |||||
| } | |||||
| } | |||||
| func (d *DecoderHealth) Marshal() ([]byte, error) { | func (d *DecoderHealth) Marshal() ([]byte, error) { | ||||
| return json.Marshal(d) | return json.Marshal(d) | ||||
| } | } | ||||