diff --git a/bridge b/bridge index 7873388..a5d2d28 100755 Binary files a/bridge and b/bridge differ diff --git a/decoder b/decoder index 9bdc564..affd6fd 100755 Binary files a/decoder and b/decoder differ diff --git a/internal/app/server/events.go b/internal/app/server/events.go index d1b4af2..b66ec7a 100644 --- a/internal/app/server/events.go +++ b/internal/app/server/events.go @@ -13,15 +13,21 @@ import ( ) // RunEventLoop runs the server event loop until ctx is cancelled. -// Handles: location events -> LocationToBeaconService, alert events -> update tracker in DB, ticker -> publish trackers to mqtt. +// Handles: location events -> LocationToBeaconService, alert events -> update tracker in DB, ticker -> publish trackers to mqtt, health ticker -> Kafka and DB status. func RunEventLoop(ctx context.Context, a *ServerApp) { beaconTicker := time.NewTicker(config.MEDIUM_TICKER_INTERVAL) defer beaconTicker.Stop() + healthCheckTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL) + defer healthCheckTicker.Stop() for { select { case <-ctx.Done(): return + case <-healthCheckTicker.C: + kafkaSt := CheckKafkaHealth(ctx, a.Cfg.KafkaURL) + dbSt := CheckDBHealth(ctx, a.DB) + a.AppState.UpdateServerHealth(kafkaSt, dbSt) case msg := <-a.ChHealthLocation: a.AppState.UpdateLocationHealth(msg) case msg := <-a.ChHealthDecoder: diff --git a/internal/app/server/healthcheck.go b/internal/app/server/healthcheck.go new file mode 100644 index 0000000..9e480b9 --- /dev/null +++ b/internal/app/server/healthcheck.go @@ -0,0 +1,44 @@ +package server + +import ( + "context" + "strings" + "time" + + "github.com/AFASystems/presence/internal/pkg/common/appcontext" + "github.com/segmentio/kafka-go" + "gorm.io/gorm" +) + +const healthCheckTimeout = 3 * time.Second + +// CheckKafkaHealth dials the broker and returns ServiceStatus. Uses first broker from kafkaURL (comma-separated). +func CheckKafkaHealth(ctx context.Context, kafkaURL string) appcontext.ServiceStatus { + if kafkaURL == "" { + return appcontext.ServiceStatus{Status: "down", Message: "no broker URL"} + } + broker := strings.TrimSpace(strings.Split(kafkaURL, ",")[0]) + tctx, cancel := context.WithTimeout(ctx, healthCheckTimeout) + defer cancel() + dialer := &kafka.Dialer{Timeout: healthCheckTimeout} + conn, err := dialer.DialContext(tctx, "tcp", broker) + if err != nil { + return appcontext.ServiceStatus{Status: "down", Message: err.Error()} + } + _ = conn.Close() + return appcontext.ServiceStatus{Status: "up"} +} + +// CheckDBHealth pings the database and returns ServiceStatus. +func CheckDBHealth(ctx context.Context, db *gorm.DB) appcontext.ServiceStatus { + sqlDB, err := db.DB() + if err != nil { + return appcontext.ServiceStatus{Status: "down", Message: err.Error()} + } + tctx, cancel := context.WithTimeout(ctx, healthCheckTimeout) + defer cancel() + if err := sqlDB.PingContext(tctx); err != nil { + return appcontext.ServiceStatus{Status: "down", Message: err.Error()} + } + return appcontext.ServiceStatus{Status: "up"} +} diff --git a/internal/pkg/common/appcontext/context.go b/internal/pkg/common/appcontext/context.go index cdf3331..2a9237a 100644 --- a/internal/pkg/common/appcontext/context.go +++ b/internal/pkg/common/appcontext/context.go @@ -78,6 +78,8 @@ func NewAppState() *AppState { ActiveBeacons: []string{}, }, }, + Kafka: ServiceStatus{Status: "unknown"}, + Database: ServiceStatus{Status: "unknown"}, }, } } @@ -106,6 +108,14 @@ func (m *AppState) UpdateBridgeHealth(h BridgeHealth) { m.mu.Unlock() } +// UpdateServerHealth updates Kafka and database status (called by server after checking broker and DB). +func (m *AppState) UpdateServerHealth(kafka, database ServiceStatus) { + m.mu.Lock() + m.health.Kafka = kafka + m.health.Database = database + m.mu.Unlock() +} + func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) ([]byte, error) { m.health.Location.GetUptime(m.startTime) m.health.Location.GetActiveReaders(k) diff --git a/internal/pkg/common/appcontext/health.go b/internal/pkg/common/appcontext/health.go index 92ab6fb..b804eab 100644 --- a/internal/pkg/common/appcontext/health.go +++ b/internal/pkg/common/appcontext/health.go @@ -7,6 +7,12 @@ import ( "github.com/AFASystems/presence/internal/pkg/kafkaclient" ) +// ServiceStatus represents the health of an external service (e.g. Kafka, database). +type ServiceStatus struct { + Status string `json:"status"` // "up", "down", "unknown" + Message string `json:"message,omitempty"` +} + type BaseHealth struct { Uptime time.Duration `json:"uptime"` ActiveReaders []string `json:"activeReaders" gorm:"type:jsonb"` @@ -29,9 +35,11 @@ type BridgeHealth struct { } type Health struct { - Location LocationHealth `gorm:"embedded;embeddedPrefix:loc_"` - Decoder DecoderHealth `gorm:"embedded;embeddedPrefix:dec_"` - Bridge BridgeHealth `gorm:"embedded;embeddedPrefix:brg_"` + Location LocationHealth `json:"location" gorm:"embedded;embeddedPrefix:loc_"` + Decoder DecoderHealth `json:"decoder" gorm:"embedded;embeddedPrefix:dec_"` + Bridge BridgeHealth `json:"bridge" gorm:"embedded;embeddedPrefix:brg_"` + Kafka ServiceStatus `json:"kafka"` + Database ServiceStatus `json:"database"` } func (b *BaseHealth) GetUptime(startTime time.Time) { diff --git a/location b/location index b3da586..074ee91 100755 Binary files a/location and b/location differ diff --git a/server b/server index 58645a7..f0241d7 100755 Binary files a/server and b/server differ