| @@ -13,15 +13,21 @@ import ( | |||||
| ) | ) | ||||
| // RunEventLoop runs the server event loop until ctx is cancelled. | // RunEventLoop runs the server event loop until ctx is cancelled. | ||||
| // Handles: location events -> LocationToBeaconService, alert events -> update tracker in DB, ticker -> publish trackers to mqtt. | |||||
| // Handles: location events -> LocationToBeaconService, alert events -> update tracker in DB, ticker -> publish trackers to mqtt, health ticker -> Kafka and DB status. | |||||
| func RunEventLoop(ctx context.Context, a *ServerApp) { | func RunEventLoop(ctx context.Context, a *ServerApp) { | ||||
| beaconTicker := time.NewTicker(config.MEDIUM_TICKER_INTERVAL) | beaconTicker := time.NewTicker(config.MEDIUM_TICKER_INTERVAL) | ||||
| defer beaconTicker.Stop() | defer beaconTicker.Stop() | ||||
| healthCheckTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL) | |||||
| defer healthCheckTicker.Stop() | |||||
| for { | for { | ||||
| select { | select { | ||||
| case <-ctx.Done(): | case <-ctx.Done(): | ||||
| return | return | ||||
| case <-healthCheckTicker.C: | |||||
| kafkaSt := CheckKafkaHealth(ctx, a.Cfg.KafkaURL) | |||||
| dbSt := CheckDBHealth(ctx, a.DB) | |||||
| a.AppState.UpdateServerHealth(kafkaSt, dbSt) | |||||
| case msg := <-a.ChHealthLocation: | case msg := <-a.ChHealthLocation: | ||||
| a.AppState.UpdateLocationHealth(msg) | a.AppState.UpdateLocationHealth(msg) | ||||
| case msg := <-a.ChHealthDecoder: | case msg := <-a.ChHealthDecoder: | ||||
| @@ -0,0 +1,44 @@ | |||||
| package server | |||||
| import ( | |||||
| "context" | |||||
| "strings" | |||||
| "time" | |||||
| "github.com/AFASystems/presence/internal/pkg/common/appcontext" | |||||
| "github.com/segmentio/kafka-go" | |||||
| "gorm.io/gorm" | |||||
| ) | |||||
| const healthCheckTimeout = 3 * time.Second | |||||
| // CheckKafkaHealth dials the broker and returns ServiceStatus. Uses first broker from kafkaURL (comma-separated). | |||||
| func CheckKafkaHealth(ctx context.Context, kafkaURL string) appcontext.ServiceStatus { | |||||
| if kafkaURL == "" { | |||||
| return appcontext.ServiceStatus{Status: "down", Message: "no broker URL"} | |||||
| } | |||||
| broker := strings.TrimSpace(strings.Split(kafkaURL, ",")[0]) | |||||
| tctx, cancel := context.WithTimeout(ctx, healthCheckTimeout) | |||||
| defer cancel() | |||||
| dialer := &kafka.Dialer{Timeout: healthCheckTimeout} | |||||
| conn, err := dialer.DialContext(tctx, "tcp", broker) | |||||
| if err != nil { | |||||
| return appcontext.ServiceStatus{Status: "down", Message: err.Error()} | |||||
| } | |||||
| _ = conn.Close() | |||||
| return appcontext.ServiceStatus{Status: "up"} | |||||
| } | |||||
| // CheckDBHealth pings the database and returns ServiceStatus. | |||||
| func CheckDBHealth(ctx context.Context, db *gorm.DB) appcontext.ServiceStatus { | |||||
| sqlDB, err := db.DB() | |||||
| if err != nil { | |||||
| return appcontext.ServiceStatus{Status: "down", Message: err.Error()} | |||||
| } | |||||
| tctx, cancel := context.WithTimeout(ctx, healthCheckTimeout) | |||||
| defer cancel() | |||||
| if err := sqlDB.PingContext(tctx); err != nil { | |||||
| return appcontext.ServiceStatus{Status: "down", Message: err.Error()} | |||||
| } | |||||
| return appcontext.ServiceStatus{Status: "up"} | |||||
| } | |||||
| @@ -78,6 +78,8 @@ func NewAppState() *AppState { | |||||
| ActiveBeacons: []string{}, | ActiveBeacons: []string{}, | ||||
| }, | }, | ||||
| }, | }, | ||||
| Kafka: ServiceStatus{Status: "unknown"}, | |||||
| Database: ServiceStatus{Status: "unknown"}, | |||||
| }, | }, | ||||
| } | } | ||||
| } | } | ||||
| @@ -106,6 +108,14 @@ func (m *AppState) UpdateBridgeHealth(h BridgeHealth) { | |||||
| m.mu.Unlock() | m.mu.Unlock() | ||||
| } | } | ||||
| // UpdateServerHealth updates Kafka and database status (called by server after checking broker and DB). | |||||
| func (m *AppState) UpdateServerHealth(kafka, database ServiceStatus) { | |||||
| m.mu.Lock() | |||||
| m.health.Kafka = kafka | |||||
| m.health.Database = database | |||||
| m.mu.Unlock() | |||||
| } | |||||
| func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) ([]byte, error) { | func (m *AppState) GetLocationHealth(k *kafkaclient.KafkaManager) ([]byte, error) { | ||||
| m.health.Location.GetUptime(m.startTime) | m.health.Location.GetUptime(m.startTime) | ||||
| m.health.Location.GetActiveReaders(k) | m.health.Location.GetActiveReaders(k) | ||||
| @@ -7,6 +7,12 @@ import ( | |||||
| "github.com/AFASystems/presence/internal/pkg/kafkaclient" | "github.com/AFASystems/presence/internal/pkg/kafkaclient" | ||||
| ) | ) | ||||
| // ServiceStatus represents the health of an external service (e.g. Kafka, database). | |||||
| type ServiceStatus struct { | |||||
| Status string `json:"status"` // "up", "down", "unknown" | |||||
| Message string `json:"message,omitempty"` | |||||
| } | |||||
| type BaseHealth struct { | type BaseHealth struct { | ||||
| Uptime time.Duration `json:"uptime"` | Uptime time.Duration `json:"uptime"` | ||||
| ActiveReaders []string `json:"activeReaders" gorm:"type:jsonb"` | ActiveReaders []string `json:"activeReaders" gorm:"type:jsonb"` | ||||
| @@ -29,9 +35,11 @@ type BridgeHealth struct { | |||||
| } | } | ||||
| type Health struct { | type Health struct { | ||||
| Location LocationHealth `gorm:"embedded;embeddedPrefix:loc_"` | |||||
| Decoder DecoderHealth `gorm:"embedded;embeddedPrefix:dec_"` | |||||
| Bridge BridgeHealth `gorm:"embedded;embeddedPrefix:brg_"` | |||||
| Location LocationHealth `json:"location" gorm:"embedded;embeddedPrefix:loc_"` | |||||
| Decoder DecoderHealth `json:"decoder" gorm:"embedded;embeddedPrefix:dec_"` | |||||
| Bridge BridgeHealth `json:"bridge" gorm:"embedded;embeddedPrefix:brg_"` | |||||
| Kafka ServiceStatus `json:"kafka"` | |||||
| Database ServiceStatus `json:"database"` | |||||
| } | } | ||||
| func (b *BaseHealth) GetUptime(startTime time.Time) { | func (b *BaseHealth) GetUptime(startTime time.Time) { | ||||