Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.
 
 
 
 

222 lignes
6.5 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "log/slog"
  9. "net/http"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  17. "github.com/AFASystems/presence/internal/pkg/config"
  18. "github.com/AFASystems/presence/internal/pkg/controller"
  19. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  20. "github.com/AFASystems/presence/internal/pkg/model"
  21. "github.com/AFASystems/presence/internal/pkg/service"
  22. "github.com/gorilla/handlers"
  23. "github.com/gorilla/mux"
  24. "github.com/gorilla/websocket"
  25. )
  26. var upgrader = websocket.Upgrader{
  27. ReadBufferSize: 1024,
  28. WriteBufferSize: 1024,
  29. }
  30. var _ io.Writer = (*os.File)(nil)
  31. var wg sync.WaitGroup
  32. func main() {
  33. cfg := config.Load()
  34. appState := appcontext.NewAppState()
  35. // Create log file
  36. logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  37. if err != nil {
  38. log.Fatalf("Failed to open log file: %v\n", err)
  39. }
  40. // shell and log file multiwriter
  41. w := io.MultiWriter(os.Stderr, logFile)
  42. logger := slog.New(slog.NewJSONHandler(w, nil))
  43. slog.SetDefault(logger)
  44. // define context
  45. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  46. defer stop()
  47. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  48. originsOk := handlers.AllowedOrigins([]string{"*"})
  49. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  50. writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
  51. settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
  52. slog.Info("Kafka writers topics: apibeacons, settings initialized")
  53. locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
  54. alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
  55. slog.Info("Kafka readers topics: locevents, alertbeacons initialized")
  56. client := appState.AddValkeyClient(cfg.ValkeyURL)
  57. slog.Info("Valkey DB client created")
  58. chLoc := make(chan model.HTTPLocation, 200)
  59. chEvents := make(chan model.BeaconEvent, 500)
  60. wg.Add(2)
  61. go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
  62. go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)
  63. r := mux.NewRouter()
  64. r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE")
  65. r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET")
  66. r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET")
  67. r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx, appState)).Methods("POST")
  68. r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx, appState)).Methods("PUT")
  69. r.HandleFunc("/api/addbeacons", controller.AddListOfBeaconsController(writer, ctx, appState)).Methods("POST")
  70. r.HandleFunc("/api/beaconids", controller.GetBeaconIds(appState)).Methods("GET")
  71. r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET")
  72. r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST")
  73. wsHandler := http.HandlerFunc(serveWs(appState, ctx))
  74. restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r)
  75. mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  76. if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") {
  77. wsHandler.ServeHTTP(w, r)
  78. return
  79. }
  80. restApiHandler.ServeHTTP(w, r)
  81. })
  82. server := http.Server{
  83. Addr: cfg.HTTPAddr,
  84. Handler: mainHandler,
  85. }
  86. go server.ListenAndServe()
  87. eventLoop:
  88. for {
  89. select {
  90. case <-ctx.Done():
  91. break eventLoop
  92. case msg := <-chLoc:
  93. if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil {
  94. eMsg := fmt.Sprintf("Error in writing location change to beacon: %v\n", err)
  95. slog.Error(eMsg)
  96. }
  97. case msg := <-chEvents:
  98. if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil {
  99. eMsg := fmt.Sprintf("Error in writing event change to beacon: %v\n", err)
  100. slog.Error(eMsg)
  101. }
  102. }
  103. }
  104. if err := server.Shutdown(context.Background()); err != nil {
  105. eMsg := fmt.Sprintf("could not shutdown: %v\n", err)
  106. slog.Error(eMsg)
  107. }
  108. slog.Info("API SERVER: \n")
  109. slog.Warn("broken out of the main event loop and HTTP server shutdown\n")
  110. wg.Wait()
  111. slog.Info("All go routines have stopped, Beggining to close Kafka connections\n")
  112. appState.CleanKafkaReaders()
  113. appState.CleanKafkaWriters()
  114. slog.Info("All kafka clients shutdown, starting shutdown of valkey client")
  115. appState.CleanValkeyClient()
  116. slog.Info("API server shutting down")
  117. logFile.Close()
  118. }
  119. func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc {
  120. return func(w http.ResponseWriter, r *http.Request) {
  121. ws, err := upgrader.Upgrade(w, r, nil)
  122. if err != nil {
  123. if _, ok := err.(websocket.HandshakeError); !ok {
  124. eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err)
  125. slog.Error(eMsg)
  126. }
  127. return
  128. }
  129. wg.Add(2)
  130. go writer(ws, appstate, ctx)
  131. go reader(ws, ctx)
  132. }
  133. }
  134. func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Context) {
  135. pingTicker := time.NewTicker((60 * 9) / 10 * time.Second)
  136. beaconTicker := time.NewTicker(2 * time.Second)
  137. defer func() {
  138. pingTicker.Stop()
  139. beaconTicker.Stop()
  140. ws.Close()
  141. wg.Done()
  142. }()
  143. for {
  144. select {
  145. case <-ctx.Done():
  146. slog.Info("WebSocket writer received shutdown signal.")
  147. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  148. ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  149. return
  150. case <-beaconTicker.C:
  151. beacons := appstate.GetAllHttpResults()
  152. js, err := json.Marshal(beacons)
  153. if err != nil {
  154. js = []byte("error")
  155. }
  156. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  157. if err := ws.WriteMessage(websocket.TextMessage, js); err != nil {
  158. return
  159. }
  160. case <-pingTicker.C:
  161. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  162. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  163. return
  164. }
  165. }
  166. }
  167. }
  168. func reader(ws *websocket.Conn, ctx context.Context) {
  169. defer func() {
  170. ws.Close()
  171. wg.Done()
  172. }()
  173. ws.SetReadLimit(512)
  174. ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second))
  175. ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil })
  176. for {
  177. select {
  178. case <-ctx.Done():
  179. slog.Info("closing ws reader")
  180. return
  181. default:
  182. _, _, err := ws.ReadMessage()
  183. if err != nil {
  184. return
  185. }
  186. }
  187. }
  188. }