Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.
 
 
 
 

156 righe
5.0 KiB

  1. package server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log/slog"
  8. "net/http"
  9. "os"
  10. "sync"
  11. "github.com/AFASystems/presence/internal/pkg/apiclient"
  12. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  13. "github.com/AFASystems/presence/internal/pkg/config"
  14. "github.com/AFASystems/presence/internal/pkg/database"
  15. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  16. "github.com/AFASystems/presence/internal/pkg/logger"
  17. "github.com/AFASystems/presence/internal/pkg/model"
  18. "github.com/AFASystems/presence/internal/pkg/service"
  19. "gorm.io/gorm"
  20. "gorm.io/gorm/clause"
  21. )
  22. // ServerApp holds dependencies and state for the server service.
  23. type ServerApp struct {
  24. Cfg *config.Config
  25. DB *gorm.DB
  26. KafkaManager *kafkaclient.KafkaManager
  27. AppState *appcontext.AppState
  28. ChLoc chan model.HTTPLocation
  29. ChEvents chan appcontext.BeaconEvent
  30. ChHealthLocation chan appcontext.LocationHealth
  31. ChHealthDecoder chan appcontext.DecoderHealth
  32. ChHealthBridge chan appcontext.BridgeHealth
  33. ctx context.Context
  34. Server *http.Server
  35. Cleanup func()
  36. wg sync.WaitGroup
  37. }
  38. // New creates a ServerApp: loads config, creates logger, connects DB, creates Kafka manager and writers.
  39. // Caller must call Init(ctx) then Run(ctx) then Shutdown().
  40. func New(cfg *config.Config) (*ServerApp, error) {
  41. srvLogger, cleanup := logger.CreateLogger("server.log")
  42. slog.SetDefault(srvLogger)
  43. db, err := database.Connect(cfg)
  44. if err != nil {
  45. cleanup()
  46. return nil, fmt.Errorf("database: %w", err)
  47. }
  48. appState := appcontext.NewAppState()
  49. kafkaManager := kafkaclient.InitKafkaManager()
  50. writerTopics := []string{"apibeacons", "alert", "mqtt", "settings", "parser"}
  51. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
  52. slog.Info("Kafka writers initialized", "topics", writerTopics)
  53. return &ServerApp{
  54. Cfg: cfg,
  55. DB: db,
  56. KafkaManager: kafkaManager,
  57. AppState: appState,
  58. Cleanup: cleanup,
  59. }, nil
  60. }
  61. // Init loads config from file, seeds DB, runs UpdateDB, adds Kafka readers and starts consumers.
  62. func (a *ServerApp) Init(ctx context.Context) error {
  63. a.ctx = ctx
  64. configFile, err := os.Open(a.Cfg.ConfigPath)
  65. if err != nil {
  66. return fmt.Errorf("config file: %w", err)
  67. }
  68. defer configFile.Close()
  69. b, err := io.ReadAll(configFile)
  70. if err != nil {
  71. return fmt.Errorf("read config: %w", err)
  72. }
  73. var configs []model.Config
  74. if err := json.Unmarshal(b, &configs); err != nil {
  75. return fmt.Errorf("unmarshal config: %w", err)
  76. }
  77. for _, c := range configs {
  78. a.DB.Clauses(clause.OnConflict{DoNothing: true}).Create(&c)
  79. }
  80. a.DB.Find(&configs)
  81. for _, c := range configs {
  82. kp := model.KafkaParser{ID: "add", Config: c}
  83. if err := service.SendParserConfig(kp, a.KafkaManager.GetWriter("parser"), ctx); err != nil {
  84. slog.Error("sending parser config to kafka", "err", err, "name", c.Name)
  85. }
  86. }
  87. if err := apiclient.UpdateDB(a.DB, ctx, a.Cfg, a.KafkaManager.GetWriter("apibeacons"), a.AppState); err != nil {
  88. slog.Error("UpdateDB", "err", err)
  89. }
  90. readerTopics := []string{"locevents", "alertbeacons", "healthlocation", "healthdecoder", "healthbridge"}
  91. a.KafkaManager.PopulateKafkaManager(a.Cfg.KafkaURL, "server", readerTopics)
  92. slog.Info("Kafka readers initialized", "topics", readerTopics)
  93. a.ChLoc = make(chan model.HTTPLocation, config.SMALL_CHANNEL_SIZE)
  94. a.ChEvents = make(chan appcontext.BeaconEvent, config.MEDIUM_CHANNEL_SIZE)
  95. a.ChHealthLocation = make(chan appcontext.LocationHealth, config.SMALL_CHANNEL_SIZE)
  96. a.ChHealthDecoder = make(chan appcontext.DecoderHealth, config.SMALL_CHANNEL_SIZE)
  97. a.ChHealthBridge = make(chan appcontext.BridgeHealth, config.SMALL_CHANNEL_SIZE)
  98. a.wg.Add(5)
  99. go kafkaclient.Consume(a.KafkaManager.GetReader("locevents"), a.ChLoc, ctx, &a.wg)
  100. go kafkaclient.Consume(a.KafkaManager.GetReader("alertbeacons"), a.ChEvents, ctx, &a.wg)
  101. go kafkaclient.Consume(a.KafkaManager.GetReader("healthlocation"), a.ChHealthLocation, ctx, &a.wg)
  102. go kafkaclient.Consume(a.KafkaManager.GetReader("healthdecoder"), a.ChHealthDecoder, ctx, &a.wg)
  103. go kafkaclient.Consume(a.KafkaManager.GetReader("healthbridge"), a.ChHealthBridge, ctx, &a.wg)
  104. a.Server = &http.Server{
  105. Addr: a.Cfg.HTTPAddr,
  106. Handler: a.RegisterRoutes(),
  107. }
  108. return nil
  109. }
  110. // Run starts the HTTP server and runs the event loop until ctx is cancelled.
  111. func (a *ServerApp) Run(ctx context.Context) {
  112. go func() {
  113. if err := a.Server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  114. slog.Error("HTTP server", "err", err)
  115. }
  116. }()
  117. RunEventLoop(ctx, a)
  118. }
  119. // Shutdown stops the HTTP server, waits for consumers, and cleans up Kafka and logger.
  120. func (a *ServerApp) Shutdown() {
  121. if a.Server != nil {
  122. if err := a.Server.Shutdown(context.Background()); err != nil {
  123. slog.Error("server shutdown", "err", err)
  124. }
  125. slog.Info("HTTP server stopped")
  126. }
  127. a.wg.Wait()
  128. slog.Info("Kafka consumers stopped")
  129. a.KafkaManager.CleanKafkaReaders()
  130. a.KafkaManager.CleanKafkaWriters()
  131. if a.Cleanup != nil {
  132. a.Cleanup()
  133. }
  134. slog.Info("server shutdown complete")
  135. }