diff --git a/bridge b/bridge index 7f49ad8..81c808d 100755 Binary files a/bridge and b/bridge differ diff --git a/decoder b/decoder index e6f87f0..c372479 100755 Binary files a/decoder and b/decoder differ diff --git a/internal/app/bridge/app.go b/internal/app/bridge/app.go index 5cc7f39..4c7a2e8 100644 --- a/internal/app/bridge/app.go +++ b/internal/app/bridge/app.go @@ -5,6 +5,7 @@ import ( "encoding/json" "log/slog" "sync" + "time" "github.com/AFASystems/presence/internal/pkg/bridge" "github.com/AFASystems/presence/internal/pkg/common/appcontext" @@ -13,6 +14,7 @@ import ( "github.com/AFASystems/presence/internal/pkg/logger" "github.com/AFASystems/presence/internal/pkg/model" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/segmentio/kafka-go" ) // BridgeApp holds dependencies for the bridge service (MQTT <-> Kafka). @@ -71,10 +73,26 @@ func (a *BridgeApp) Run(ctx context.Context) { go kafkaclient.Consume(a.KafkaManager.GetReader("alert"), a.ChAlert, ctx, &a.wg) go kafkaclient.Consume(a.KafkaManager.GetReader("mqtt"), a.ChMqtt, ctx, &a.wg) + healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL) + defer healthTicker.Stop() + for { select { case <-ctx.Done(): return + case <-healthTicker.C: + health, err := a.AppState.GetBridgeHealth(a.KafkaManager) + if err != nil { + slog.Error("getting bridge health", "err", err) + continue + } + m := kafka.Message{ + Value: health, + } + if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthbridge"), m); err != nil { + slog.Error("writing bridge health", "err", err) + continue + } case msg := <-a.ChApi: switch msg.Method { case "POST": diff --git a/internal/app/decoder/app.go b/internal/app/decoder/app.go index 65d694e..c822590 100644 --- a/internal/app/decoder/app.go +++ b/internal/app/decoder/app.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "sync" + "time" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/config" @@ -11,6 +12,7 @@ import ( "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/logger" "github.com/AFASystems/presence/internal/pkg/model" + "github.com/segmentio/kafka-go" ) // DecoderApp holds dependencies for the decoder service. @@ -60,10 +62,27 @@ func (a *DecoderApp) Run(ctx context.Context) { go kafkaclient.Consume(a.KafkaManager.GetReader("rawbeacons"), a.ChRaw, ctx, &a.wg) go kafkaclient.Consume(a.KafkaManager.GetReader("parser"), a.ChParser, ctx, &a.wg) + healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL) + defer healthTicker.Stop() + for { select { case <-ctx.Done(): return + case <-healthTicker.C: + health, err := a.AppState.GetDecoderHealth(a.KafkaManager) + slog.Info("decoder health", "health", string(health)) + if err != nil { + slog.Error("getting decoder health", "err", err) + continue + } + m := kafka.Message{ + Value: health, + } + if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthdecoder"), m); err != nil { + slog.Error("writing decoder health", "err", err) + continue + } case msg := <-a.ChRaw: decoder.ProcessIncoming(msg, a.AppState, a.KafkaManager.GetWriter("alertbeacons"), a.ParserRegistry) case msg := <-a.ChParser: diff --git a/internal/app/location/app.go b/internal/app/location/app.go index 472b2ea..fd40178 100644 --- a/internal/app/location/app.go +++ b/internal/app/location/app.go @@ -62,10 +62,26 @@ func (a *LocationApp) Run(ctx context.Context) { locTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL) defer locTicker.Stop() + healthTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL) + defer healthTicker.Stop() + for { select { case <-ctx.Done(): return + case <-healthTicker.C: + health, err := a.AppState.GetLocationHealth(a.KafkaManager) + if err != nil { + slog.Error("getting location health", "err", err) + continue + } + m := kafka.Message{ + Value: health, + } + if err := kafkaclient.Write(ctx, a.KafkaManager.GetWriter("healthlocation"), m); err != nil { + slog.Error("writing location health", "err", err) + continue + } case <-locTicker.C: settings := a.AppState.GetSettings() slog.Info("current algorithm", "algorithm", settings.CurrentAlgorithm) diff --git a/internal/app/server/events.go b/internal/app/server/events.go index f33fb20..033e521 100644 --- a/internal/app/server/events.go +++ b/internal/app/server/events.go @@ -22,6 +22,12 @@ func RunEventLoop(ctx context.Context, a *ServerApp) { select { case <-ctx.Done(): return + case msg := <-a.ChHealthLocation: + slog.Info("location health", "health", msg) + case msg := <-a.ChHealthDecoder: + slog.Info("decoder health", "health", msg) + case msg := <-a.ChHealthBridge: + slog.Info("bridge health", "health", msg) case msg := <-a.ChLoc: switch msg.Method { case "Standard": diff --git a/internal/pkg/common/appcontext/context.go b/internal/pkg/common/appcontext/context.go index 3736cf8..1fd6a53 100644 --- a/internal/pkg/common/appcontext/context.go +++ b/internal/pkg/common/appcontext/context.go @@ -4,7 +4,6 @@ import ( "fmt" "log/slog" "os" - "sync" "time" "github.com/AFASystems/presence/internal/pkg/kafkaclient" @@ -13,14 +12,12 @@ import ( // AppState provides centralized access to application state type AppState struct { - beacons BeaconsList - settings Settings - beaconEvents BeaconEventList - beaconsLookup BeaconsLookup - locationHealth LocationHealth - decoderHealth DecoderHealth - bridgeHealth BridgeHealth - startTime time.Time + beacons BeaconsList + settings Settings + beaconEvents BeaconEventList + beaconsLookup BeaconsLookup + health Health + startTime time.Time } func getEnv(key, def string) string { @@ -54,58 +51,58 @@ func NewAppState() *AppState { beaconsLookup: BeaconsLookup{ Lookup: make(map[string]string), }, - locationHealth: LocationHealth{ - BaseHealth: BaseHealth{ - Lock: sync.RWMutex{}, - Uptime: 0, - ActiveReaders: []string{}, - ActiveWriters: []string{}, - ActiveBeacons: []string{}, + health: Health{ + ID: "1", + Location: LocationHealth{ + BaseHealth: BaseHealth{ + Uptime: 0, + ActiveReaders: []string{}, + ActiveWriters: []string{}, + ActiveBeacons: []string{}, + }, }, - }, - decoderHealth: DecoderHealth{ - BaseHealth: BaseHealth{ - Lock: sync.RWMutex{}, - Uptime: 0, - ActiveReaders: []string{}, - ActiveWriters: []string{}, - ActiveBeacons: []string{}, + Decoder: DecoderHealth{ + BaseHealth: BaseHealth{ + Uptime: 0, + ActiveReaders: []string{}, + ActiveWriters: []string{}, + ActiveBeacons: []string{}, + }, }, - }, - bridgeHealth: BridgeHealth{ - BaseHealth: BaseHealth{ - Lock: sync.RWMutex{}, - Uptime: 0, - ActiveReaders: []string{}, - ActiveWriters: []string{}, - ActiveBeacons: []string{}, + Bridge: BridgeHealth{ + BaseHealth: BaseHealth{ + Uptime: 0, + ActiveReaders: []string{}, + ActiveWriters: []string{}, + ActiveBeacons: []string{}, + }, }, }, } } -func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) *LocationHealth { - m.locationHealth.GetUptime(m.startTime) - m.locationHealth.GetActiveReaders(k) - m.locationHealth.GetActiveWriters(k) - m.locationHealth.GetActiveBeacons(m) - return &m.locationHealth +func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) ([]byte, error) { + m.health.Location.GetUptime(m.startTime) + m.health.Location.GetActiveReaders(k) + m.health.Location.GetActiveWriters(k) + m.health.Location.GetActiveBeacons(m) + return m.health.Location.Marshal() } -func (m *AppState) GetDecoderHealth(k *kafkaclient.KafkaManager) *DecoderHealth { - m.decoderHealth.GetUptime(m.startTime) - m.decoderHealth.GetActiveReaders(k) - m.decoderHealth.GetActiveWriters(k) - m.decoderHealth.GetActiveBeacons(m) - return &m.decoderHealth +func (m *AppState) GetDecoderHealth(k *kafkaclient.KafkaManager) ([]byte, error) { + m.health.Decoder.GetUptime(m.startTime) + m.health.Decoder.GetActiveReaders(k) + m.health.Decoder.GetActiveWriters(k) + m.health.Decoder.GetActiveBeacons(m) + return m.health.Decoder.Marshal() } -func (m *AppState) GetBridgeHealth(k *kafkaclient.KafkaManager) *BridgeHealth { - m.bridgeHealth.GetUptime(m.startTime) - m.bridgeHealth.GetActiveReaders(k) - m.bridgeHealth.GetActiveWriters(k) - m.bridgeHealth.GetActiveBeacons(m) - return &m.bridgeHealth +func (m *AppState) GetBridgeHealth(k *kafkaclient.KafkaManager) ([]byte, error) { + m.health.Bridge.GetUptime(m.startTime) + m.health.Bridge.GetActiveReaders(k) + m.health.Bridge.GetActiveWriters(k) + m.health.Bridge.GetActiveBeacons(m) + return m.health.Bridge.Marshal() } // GetBeacons returns thread-safe access to beacons list diff --git a/internal/pkg/common/appcontext/health.go b/internal/pkg/common/appcontext/health.go index 0dec384..6852a77 100644 --- a/internal/pkg/common/appcontext/health.go +++ b/internal/pkg/common/appcontext/health.go @@ -2,18 +2,16 @@ package appcontext import ( "encoding/json" - "sync" "time" "github.com/AFASystems/presence/internal/pkg/kafkaclient" ) type BaseHealth struct { - Lock sync.RWMutex Uptime time.Duration `json:"uptime"` - ActiveReaders []string `json:"activeReaders"` - ActiveWriters []string `json:"activeWriters"` - ActiveBeacons []string `json:"activeBeacons"` + ActiveReaders []string `json:"activeReaders" gorm:"type:jsonb"` + ActiveWriters []string `json:"activeWriters" gorm:"type:jsonb"` + ActiveBeacons []string `json:"activeBeacons" gorm:"type:jsonb"` } type DecoderHealth struct { @@ -23,38 +21,37 @@ type DecoderHealth struct { type LocationHealth struct { BaseHealth - ActiveSettings []Settings `json:"activeSettings"` + ActiveSettings []Settings `json:"activeSettings" gorm:"type:jsonb"` } type BridgeHealth struct { BaseHealth } +type Health struct { + ID string `json:"id" gorm:"primaryKey"` + Location LocationHealth `gorm:"embedded;embeddedPrefix:loc_"` + Decoder DecoderHealth `gorm:"embedded;embeddedPrefix:dec_"` + Bridge BridgeHealth `gorm:"embedded;embeddedPrefix:brg_"` +} + func (b *BaseHealth) GetUptime(startTime time.Time) { - b.Lock.Lock() b.Uptime = time.Since(startTime) - b.Lock.Unlock() } func (b *BaseHealth) GetActiveReaders(m *kafkaclient.KafkaManager) { - b.Lock.Lock() b.ActiveReaders = m.GetReaders() - b.Lock.Unlock() } func (b *BaseHealth) GetActiveWriters(m *kafkaclient.KafkaManager) { - b.Lock.Lock() b.ActiveWriters = m.GetWriters() - b.Lock.Unlock() } func (b *BaseHealth) GetActiveBeacons(m *AppState) { - b.Lock.Lock() beacons := m.GetAllBeacons() for beacon := range beacons { b.ActiveBeacons = append(b.ActiveBeacons, beacon) } - b.Lock.Unlock() } func (d *DecoderHealth) Marshal() ([]byte, error) { diff --git a/internal/pkg/database/database.go b/internal/pkg/database/database.go index 70aee1e..4d0b515 100644 --- a/internal/pkg/database/database.go +++ b/internal/pkg/database/database.go @@ -26,7 +26,7 @@ func Connect(cfg *config.Config) (*gorm.DB, error) { return nil, err } - if err := db.AutoMigrate(&model.Gateway{}, model.Zone{}, model.TrackerZones{}, model.Tracker{}, model.Config{}, appcontext.Settings{}, model.Tracks{}, &model.Alert{}); err != nil { + if err := db.AutoMigrate(&model.Gateway{}, model.Zone{}, model.TrackerZones{}, model.Tracker{}, model.Config{}, appcontext.Settings{}, model.Tracks{}, &model.Alert{}, appcontext.Health{}); err != nil { return nil, err } diff --git a/location b/location index 0d97725..09b3dc5 100755 Binary files a/location and b/location differ diff --git a/server b/server index 09cf83c..b3d430d 100755 Binary files a/server and b/server differ