You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

77 lines
2.5 KiB

  1. package server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log/slog"
  7. "time"
  8. "github.com/AFASystems/presence/internal/pkg/config"
  9. "github.com/AFASystems/presence/internal/pkg/model"
  10. "github.com/AFASystems/presence/internal/pkg/service"
  11. "github.com/segmentio/kafka-go"
  12. )
  13. // RunEventLoop runs the server event loop until ctx is cancelled.
  14. // Handles: location events -> LocationToBeaconService, alert events -> update tracker in DB, ticker -> publish trackers to mqtt, health ticker -> Kafka and DB status.
  15. func RunEventLoop(ctx context.Context, a *ServerApp) {
  16. beaconTicker := time.NewTicker(config.MEDIUM_TICKER_INTERVAL)
  17. defer beaconTicker.Stop()
  18. healthCheckTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL)
  19. defer healthCheckTicker.Stop()
  20. for {
  21. select {
  22. case <-ctx.Done():
  23. return
  24. case <-healthCheckTicker.C:
  25. kafkaSt := CheckKafkaHealth(ctx, a.Cfg.KafkaURL)
  26. dbSt := CheckDBHealth(ctx, a.DB)
  27. a.AppState.UpdateServerHealth(kafkaSt, dbSt)
  28. case msg := <-a.ChHealthLocation:
  29. a.AppState.UpdateLocationHealth(msg)
  30. case msg := <-a.ChHealthDecoder:
  31. a.AppState.UpdateDecoderHealth(msg)
  32. case msg := <-a.ChHealthBridge:
  33. a.AppState.UpdateBridgeHealth(msg)
  34. case msg := <-a.ChLoc:
  35. switch msg.Method {
  36. case "Standard":
  37. service.LocationToBeaconService(msg, a.DB, a.KafkaManager.GetWriter("alert"), ctx)
  38. case "AI":
  39. service.LocationToBeaconServiceAI(msg, a.DB, a.KafkaManager.GetWriter("alert"), ctx)
  40. default:
  41. slog.Error("unknown method", "method", msg.Method)
  42. continue
  43. }
  44. case msg := <-a.ChEvents:
  45. id := msg.ID
  46. if err := a.DB.First(&model.Tracker{}, "id = ?", id).Error; err != nil {
  47. slog.Error("decoder event for untracked beacon", "id", id)
  48. continue
  49. }
  50. if err := a.DB.Updates(&model.Tracker{ID: id, Battery: msg.Battery, Temperature: msg.Temperature}).Error; err != nil {
  51. slog.Error("saving decoder event for beacon", "id", id, "err", err)
  52. continue
  53. }
  54. if msg.Type == "Eddystone" && msg.Battery < 3000 {
  55. fmt.Printf("Sending alert for battery low: %+v\n", msg)
  56. service.SendAlert(id, "BatteryLow", a.KafkaManager.GetWriter("alert"), ctx, a.DB)
  57. }
  58. case <-beaconTicker.C:
  59. var list []model.Tracker
  60. a.DB.Find(&list)
  61. eMsg, err := json.Marshal(list)
  62. if err != nil {
  63. slog.Error("marshaling trackers list", "err", err)
  64. continue
  65. }
  66. if err := a.KafkaManager.GetWriter("mqtt").WriteMessages(ctx, kafka.Message{Value: eMsg}); err != nil {
  67. slog.Error("writing trackers to mqtt topic", "err", err)
  68. }
  69. }
  70. }
  71. }