Nelze vybrat více než 25 témat Téma musí začínat písmenem nebo číslem, může obsahovat pomlčky („-“) a může být dlouhé až 35 znaků.
 
 
 
 

155 řádky
5.0 KiB

  1. package server
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log/slog"
  8. "net/http"
  9. "os"
  10. "sync"
  11. "github.com/AFASystems/presence/internal/pkg/apiclient"
  12. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  13. "github.com/AFASystems/presence/internal/pkg/config"
  14. "github.com/AFASystems/presence/internal/pkg/database"
  15. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  16. "github.com/AFASystems/presence/internal/pkg/logger"
  17. "github.com/AFASystems/presence/internal/pkg/model"
  18. "github.com/AFASystems/presence/internal/pkg/service"
  19. "gorm.io/gorm"
  20. )
  21. // ServerApp holds dependencies and state for the server service.
  22. type ServerApp struct {
  23. Cfg *config.Config
  24. DB *gorm.DB
  25. KafkaManager *kafkaclient.KafkaManager
  26. AppState *appcontext.AppState
  27. ChLoc chan model.HTTPLocation
  28. ChEvents chan appcontext.BeaconEvent
  29. ChHealthLocation chan appcontext.LocationHealth
  30. ChHealthDecoder chan appcontext.DecoderHealth
  31. ChHealthBridge chan appcontext.BridgeHealth
  32. ctx context.Context
  33. Server *http.Server
  34. Cleanup func()
  35. wg sync.WaitGroup
  36. }
  37. // New creates a ServerApp: loads config, creates logger, connects DB, creates Kafka manager and writers.
  38. // Caller must call Init(ctx) then Run(ctx) then Shutdown().
  39. func New(cfg *config.Config) (*ServerApp, error) {
  40. srvLogger, cleanup := logger.CreateLogger("server.log")
  41. slog.SetDefault(srvLogger)
  42. db, err := database.Connect(cfg)
  43. if err != nil {
  44. cleanup()
  45. return nil, fmt.Errorf("database: %w", err)
  46. }
  47. appState := appcontext.NewAppState()
  48. kafkaManager := kafkaclient.InitKafkaManager()
  49. writerTopics := []string{"apibeacons", "alert", "mqtt", "settings", "parser"}
  50. kafkaManager.PopulateKafkaManager(cfg.KafkaURL, "", writerTopics)
  51. slog.Info("Kafka writers initialized", "topics", writerTopics)
  52. return &ServerApp{
  53. Cfg: cfg,
  54. DB: db,
  55. KafkaManager: kafkaManager,
  56. AppState: appState,
  57. Cleanup: cleanup,
  58. }, nil
  59. }
  60. // Init loads config from file, seeds DB, runs UpdateDB, adds Kafka readers and starts consumers.
  61. func (a *ServerApp) Init(ctx context.Context) error {
  62. a.ctx = ctx
  63. configFile, err := os.Open(a.Cfg.ConfigPath)
  64. if err != nil {
  65. return fmt.Errorf("config file: %w", err)
  66. }
  67. defer configFile.Close()
  68. b, err := io.ReadAll(configFile)
  69. if err != nil {
  70. return fmt.Errorf("read config: %w", err)
  71. }
  72. var configs []model.Config
  73. if err := json.Unmarshal(b, &configs); err != nil {
  74. return fmt.Errorf("unmarshal config: %w", err)
  75. }
  76. for _, c := range configs {
  77. a.DB.Create(&c)
  78. }
  79. a.DB.Find(&configs)
  80. for _, c := range configs {
  81. kp := model.KafkaParser{ID: "add", Config: c}
  82. if err := service.SendParserConfig(kp, a.KafkaManager.GetWriter("parser"), ctx); err != nil {
  83. slog.Error("sending parser config to kafka", "err", err, "name", c.Name)
  84. }
  85. }
  86. if err := apiclient.UpdateDB(a.DB, ctx, a.Cfg, a.KafkaManager.GetWriter("apibeacons"), a.AppState); err != nil {
  87. slog.Error("UpdateDB", "err", err)
  88. }
  89. readerTopics := []string{"locevents", "alertbeacons", "healthlocation", "healthdecoder", "healthbridge"}
  90. a.KafkaManager.PopulateKafkaManager(a.Cfg.KafkaURL, "server", readerTopics)
  91. slog.Info("Kafka readers initialized", "topics", readerTopics)
  92. a.ChLoc = make(chan model.HTTPLocation, config.SMALL_CHANNEL_SIZE)
  93. a.ChEvents = make(chan appcontext.BeaconEvent, config.MEDIUM_CHANNEL_SIZE)
  94. a.ChHealthLocation = make(chan appcontext.LocationHealth, config.SMALL_CHANNEL_SIZE)
  95. a.ChHealthDecoder = make(chan appcontext.DecoderHealth, config.SMALL_CHANNEL_SIZE)
  96. a.ChHealthBridge = make(chan appcontext.BridgeHealth, config.SMALL_CHANNEL_SIZE)
  97. a.wg.Add(5)
  98. go kafkaclient.Consume(a.KafkaManager.GetReader("locevents"), a.ChLoc, ctx, &a.wg)
  99. go kafkaclient.Consume(a.KafkaManager.GetReader("alertbeacons"), a.ChEvents, ctx, &a.wg)
  100. go kafkaclient.Consume(a.KafkaManager.GetReader("healthlocation"), a.ChHealthLocation, ctx, &a.wg)
  101. go kafkaclient.Consume(a.KafkaManager.GetReader("healthdecoder"), a.ChHealthDecoder, ctx, &a.wg)
  102. go kafkaclient.Consume(a.KafkaManager.GetReader("healthbridge"), a.ChHealthBridge, ctx, &a.wg)
  103. a.Server = &http.Server{
  104. Addr: a.Cfg.HTTPAddr,
  105. Handler: a.RegisterRoutes(),
  106. }
  107. return nil
  108. }
  109. // Run starts the HTTP server and runs the event loop until ctx is cancelled.
  110. func (a *ServerApp) Run(ctx context.Context) {
  111. go func() {
  112. if err := a.Server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  113. slog.Error("HTTP server", "err", err)
  114. }
  115. }()
  116. RunEventLoop(ctx, a)
  117. }
  118. // Shutdown stops the HTTP server, waits for consumers, and cleans up Kafka and logger.
  119. func (a *ServerApp) Shutdown() {
  120. if a.Server != nil {
  121. if err := a.Server.Shutdown(context.Background()); err != nil {
  122. slog.Error("server shutdown", "err", err)
  123. }
  124. slog.Info("HTTP server stopped")
  125. }
  126. a.wg.Wait()
  127. slog.Info("Kafka consumers stopped")
  128. a.KafkaManager.CleanKafkaReaders()
  129. a.KafkaManager.CleanKafkaWriters()
  130. if a.Cleanup != nil {
  131. a.Cleanup()
  132. }
  133. slog.Info("server shutdown complete")
  134. }