You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

72 line
2.3 KiB

  1. package server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "log/slog"
  6. "time"
  7. "github.com/AFASystems/presence/internal/pkg/config"
  8. "github.com/AFASystems/presence/internal/pkg/model"
  9. "github.com/AFASystems/presence/internal/pkg/service"
  10. "github.com/segmentio/kafka-go"
  11. )
  12. // RunEventLoop runs the server event loop until ctx is cancelled.
  13. // Handles: location events -> LocationToBeaconService, alert events -> update tracker in DB, ticker -> publish trackers to mqtt, health ticker -> Kafka and DB status.
  14. func RunEventLoop(ctx context.Context, a *ServerApp) {
  15. beaconTicker := time.NewTicker(config.MEDIUM_TICKER_INTERVAL)
  16. defer beaconTicker.Stop()
  17. healthCheckTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL)
  18. defer healthCheckTicker.Stop()
  19. for {
  20. select {
  21. case <-ctx.Done():
  22. return
  23. case <-healthCheckTicker.C:
  24. kafkaSt := CheckKafkaHealth(ctx, a.Cfg.KafkaURL)
  25. dbSt := CheckDBHealth(ctx, a.DB)
  26. a.AppState.UpdateServerHealth(kafkaSt, dbSt)
  27. case msg := <-a.ChHealthLocation:
  28. a.AppState.UpdateLocationHealth(msg)
  29. case msg := <-a.ChHealthDecoder:
  30. a.AppState.UpdateDecoderHealth(msg)
  31. case msg := <-a.ChHealthBridge:
  32. a.AppState.UpdateBridgeHealth(msg)
  33. case msg := <-a.ChLoc:
  34. switch msg.Method {
  35. case "Standard":
  36. service.LocationToBeaconService(msg, a.DB, a.KafkaManager.GetWriter("alert"), ctx)
  37. case "AI":
  38. service.LocationToBeaconServiceAI(msg, a.DB, a.KafkaManager.GetWriter("alert"), ctx)
  39. default:
  40. slog.Error("unknown method", "method", msg.Method)
  41. continue
  42. }
  43. case msg := <-a.ChEvents:
  44. id := msg.ID
  45. if err := a.DB.First(&model.Tracker{}, "id = ?", id).Error; err != nil {
  46. slog.Error("decoder event for untracked beacon", "id", id)
  47. continue
  48. }
  49. if err := a.DB.Updates(&model.Tracker{ID: id, Battery: msg.Battery, Temperature: msg.Temperature}).Error; err != nil {
  50. slog.Error("saving decoder event for beacon", "id", id, "err", err)
  51. continue
  52. }
  53. case <-beaconTicker.C:
  54. var list []model.Tracker
  55. a.DB.Find(&list)
  56. eMsg, err := json.Marshal(list)
  57. if err != nil {
  58. slog.Error("marshaling trackers list", "err", err)
  59. continue
  60. }
  61. if err := a.KafkaManager.GetWriter("mqtt").WriteMessages(ctx, kafka.Message{Value: eMsg}); err != nil {
  62. slog.Error("writing trackers to mqtt topic", "err", err)
  63. }
  64. }
  65. }
  66. }