package server import ( "context" "encoding/json" "log/slog" "time" "github.com/AFASystems/presence/internal/pkg/config" "github.com/AFASystems/presence/internal/pkg/model" "github.com/AFASystems/presence/internal/pkg/service" "github.com/segmentio/kafka-go" ) // RunEventLoop runs the server event loop until ctx is cancelled. // Handles: location events -> LocationToBeaconService, alert events -> update tracker in DB, ticker -> publish trackers to mqtt. func RunEventLoop(ctx context.Context, a *ServerApp) { beaconTicker := time.NewTicker(config.MEDIUM_TICKER_INTERVAL) defer beaconTicker.Stop() for { select { case <-ctx.Done(): return case msg := <-a.ChHealthLocation: a.AppState.UpdateLocationHealth(msg) case msg := <-a.ChHealthDecoder: a.AppState.UpdateDecoderHealth(msg) case msg := <-a.ChHealthBridge: a.AppState.UpdateBridgeHealth(msg) case msg := <-a.ChLoc: switch msg.Method { case "Standard": service.LocationToBeaconService(msg, a.DB, a.KafkaManager.GetWriter("alert"), ctx) case "AI": service.LocationToBeaconServiceAI(msg, a.DB, a.KafkaManager.GetWriter("alert"), ctx) default: slog.Error("unknown method", "method", msg.Method) continue } case msg := <-a.ChEvents: id := msg.ID if err := a.DB.First(&model.Tracker{}, "id = ?", id).Error; err != nil { slog.Error("decoder event for untracked beacon", "id", id) continue } if err := a.DB.Updates(&model.Tracker{ID: id, Battery: msg.Battery, Temperature: msg.Temperature}).Error; err != nil { slog.Error("saving decoder event for beacon", "id", id, "err", err) continue } case <-beaconTicker.C: var list []model.Tracker a.DB.Find(&list) eMsg, err := json.Marshal(list) if err != nil { slog.Error("marshaling trackers list", "err", err) continue } if err := a.KafkaManager.GetWriter("mqtt").WriteMessages(ctx, kafka.Message{Value: eMsg}); err != nil { slog.Error("writing trackers to mqtt topic", "err", err) } } } }