You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

124 lines
3.8 KiB

  1. package location
  2. import (
  3. "context"
  4. "encoding/json"
  5. "log/slog"
  6. "sync"
  7. "time"
  8. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  9. "github.com/AFASystems/presence/internal/pkg/config"
  10. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  11. pkglocation "github.com/AFASystems/presence/internal/pkg/location"
  12. "github.com/AFASystems/presence/internal/pkg/logger"
  13. "github.com/AFASystems/presence/internal/pkg/model"
  14. "github.com/segmentio/kafka-go"
  15. )
  16. // LocationApp holds dependencies for the location service.
  17. type LocationApp struct {
  18. Cfg *config.Config
  19. KafkaManager *kafkaclient.KafkaManager
  20. AppState *appcontext.AppState
  21. Inferencer pkglocation.Inferencer
  22. ChRaw chan appcontext.BeaconAdvertisement
  23. ChSettings chan map[string]any
  24. Cleanup func()
  25. wg sync.WaitGroup
  26. }
  27. // New creates a LocationApp with Kafka readers (rawbeacons, settings) and writer (locevents).
  28. func New(cfg *config.Config) (*LocationApp, error) {
  29. appState := appcontext.NewAppState()
  30. kafkaManager := kafkaclient.InitKafkaManager()
  31. srvLogger, cleanup := logger.CreateLogger("location.log")
  32. slog.SetDefault(srvLogger)
  33. readerTopics := []string{"rawbeacons", "settings"}
  34. writerTopics := []string{"locevents"}
  35. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "location", readerTopics)
  36. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
  37. slog.Info("location service initialized", "readers", readerTopics, "writers", writerTopics)
  38. return &LocationApp{
  39. Cfg: cfg,
  40. KafkaManager: kafkaManager,
  41. AppState: appState,
  42. Inferencer: pkglocation.NewDefaultInferencer(cfg.TLSInsecureSkipVerify),
  43. ChRaw: make(chan appcontext.BeaconAdvertisement, config.LARGE_CHANNEL_SIZE),
  44. ChSettings: make(chan map[string]any, config.SMALL_CHANNEL_SIZE),
  45. Cleanup: cleanup,
  46. }, nil
  47. }
  48. // Run starts consumers and the event loop until ctx is cancelled.
  49. func (a *LocationApp) Run(ctx context.Context) {
  50. a.wg.Add(2)
  51. go kafkaclient.Consume(a.KafkaManager.GetReader("rawbeacons"), a.ChRaw, ctx, &a.wg)
  52. go kafkaclient.Consume(a.KafkaManager.GetReader("settings"), a.ChSettings, ctx, &a.wg)
  53. locTicker := time.NewTicker(config.LARGE_TICKER_INTERVAL)
  54. defer locTicker.Stop()
  55. for {
  56. select {
  57. case <-ctx.Done():
  58. return
  59. case <-locTicker.C:
  60. settings := a.AppState.GetSettings()
  61. slog.Info("current algorithm", "algorithm", settings.CurrentAlgorithm)
  62. switch settings.CurrentAlgorithm {
  63. case "filter":
  64. pkglocation.GetLikelyLocations(a.AppState, a.KafkaManager.GetWriter("locevents"))
  65. case "ai":
  66. inferred, err := a.Inferencer.Infer(ctx, a.Cfg)
  67. if err != nil {
  68. slog.Error("AI inference", "err", err)
  69. continue
  70. }
  71. for _, item := range inferred.Items {
  72. r := model.HTTPLocation{
  73. Method: "AI",
  74. Y: item.Y,
  75. X: item.X,
  76. Z: item.Z,
  77. MAC: item.Mac,
  78. LastSeen: time.Now().Unix(),
  79. }
  80. js, err := json.Marshal(r)
  81. if err != nil {
  82. slog.Error("marshaling location", "err", err, "beacon_id", item.Mac)
  83. continue
  84. }
  85. if err := a.KafkaManager.GetWriter("locevents").WriteMessages(ctx, kafka.Message{Value: js}); err != nil {
  86. slog.Error("sending kafka location message", "err", err, "beacon_id", item.Mac)
  87. }
  88. }
  89. slog.Info("AI algorithm", "count", inferred.Count, "items", len(inferred.Items))
  90. }
  91. case msg := <-a.ChRaw:
  92. pkglocation.AssignBeaconToList(msg, a.AppState)
  93. case msg := <-a.ChSettings:
  94. slog.Info("settings update", "msg", msg)
  95. a.AppState.UpdateSettings(msg)
  96. }
  97. }
  98. }
  99. // Shutdown waits for consumers and cleans up Kafka and logger.
  100. func (a *LocationApp) Shutdown() {
  101. a.wg.Wait()
  102. a.KafkaManager.CleanKafkaReaders()
  103. a.KafkaManager.CleanKafkaWriters()
  104. if a.Cleanup != nil {
  105. a.Cleanup()
  106. }
  107. slog.Info("location service shutdown complete")
  108. }