No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.
 
 
 
 

242 líneas
7.7 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "log/slog"
  9. "net/http"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "github.com/AFASystems/presence/internal/pkg/apiclient"
  17. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  18. "github.com/AFASystems/presence/internal/pkg/config"
  19. "github.com/AFASystems/presence/internal/pkg/controller"
  20. "github.com/AFASystems/presence/internal/pkg/database"
  21. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  22. "github.com/AFASystems/presence/internal/pkg/model"
  23. "github.com/AFASystems/presence/internal/pkg/service"
  24. "github.com/gorilla/handlers"
  25. "github.com/gorilla/mux"
  26. "github.com/gorilla/websocket"
  27. )
  28. var upgrader = websocket.Upgrader{
  29. ReadBufferSize: 1024,
  30. WriteBufferSize: 1024,
  31. }
  32. var _ io.Writer = (*os.File)(nil)
  33. var wg sync.WaitGroup
  34. func main() {
  35. cfg := config.Load()
  36. appState := appcontext.NewAppState()
  37. // Create log file
  38. logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  39. if err != nil {
  40. log.Fatalf("Failed to open log file: %v\n", err)
  41. }
  42. // shell and log file multiwriter
  43. w := io.MultiWriter(os.Stderr, logFile)
  44. logger := slog.New(slog.NewJSONHandler(w, nil))
  45. slog.SetDefault(logger)
  46. // define context
  47. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  48. defer stop()
  49. db, err := database.Connect(cfg)
  50. if err != nil {
  51. log.Fatalf("Failed to open database connection: %v\n", err)
  52. }
  53. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  54. originsOk := handlers.AllowedOrigins([]string{"*"})
  55. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  56. writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
  57. settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
  58. alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alert")
  59. slog.Info("Kafka writers topics: apibeacons, settings initialized")
  60. if err := apiclient.UpdateDB(db, ctx, cfg, writer); err != nil {
  61. fmt.Printf("Error in getting token: %v\n", err)
  62. }
  63. locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
  64. alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
  65. slog.Info("Kafka readers topics: locevents, alertbeacons initialized")
  66. chLoc := make(chan model.HTTPLocation, 200)
  67. chEvents := make(chan model.BeaconEvent, 500)
  68. wg.Add(2)
  69. go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
  70. go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)
  71. r := mux.NewRouter()
  72. r.HandleFunc("/api/settings", controller.SettingsListController(appState, ctx)).Methods("GET")
  73. r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, ctx)).Methods("POST")
  74. r.HandleFunc("/reslevis/getGateways", controller.GatewayListController(db)).Methods("GET")
  75. r.HandleFunc("/reslevis/postGateway", controller.GatewayAddController(db)).Methods("POST")
  76. r.HandleFunc("/reslevis/removeGateway/{id}", controller.GatewayDeleteController(db)).Methods("DELETE")
  77. r.HandleFunc("/reslevis/updateGateway/{id}", controller.GatewayUpdateController(db)).Methods("PUT")
  78. r.HandleFunc("/reslevis/getZones", controller.ZoneListController(db)).Methods("GET")
  79. r.HandleFunc("/reslevis/postZone", controller.ZoneAddController(db)).Methods("POST")
  80. r.HandleFunc("/reslevis/removeZone/{id}", controller.ZoneDeleteController(db)).Methods("DELETE")
  81. r.HandleFunc("/reslevis/updateZone", controller.ZoneUpdateController(db)).Methods("PUT")
  82. r.HandleFunc("/reslevis/getTrackerZones", controller.TrackerZoneListController(db)).Methods("GET")
  83. r.HandleFunc("/reslevis/postTrackerZone", controller.TrackerZoneAddController(db)).Methods("POST")
  84. r.HandleFunc("/reslevis/removeTrackerZone/{id}", controller.TrackerZoneDeleteController(db)).Methods("DELETE")
  85. r.HandleFunc("/reslevis/updateTrackerZone", controller.TrackerZoneUpdateController(db)).Methods("PUT")
  86. r.HandleFunc("/reslevis/getTrackers", controller.TrackerList(db)).Methods("GET")
  87. r.HandleFunc("/reslevis/postTracker", controller.TrackerAdd(db, writer, ctx)).Methods("POST")
  88. r.HandleFunc("/reslevis/removeTracker/{id}", controller.TrackerDelete(db, writer, ctx)).Methods("DELETE")
  89. r.HandleFunc("/reslevis/updateTracker", controller.TrackerUpdate(db)).Methods("PUT")
  90. wsHandler := http.HandlerFunc(serveWs(appState, ctx))
  91. restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r)
  92. mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  93. if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") {
  94. wsHandler.ServeHTTP(w, r)
  95. return
  96. }
  97. restApiHandler.ServeHTTP(w, r)
  98. })
  99. server := http.Server{
  100. Addr: cfg.HTTPAddr,
  101. Handler: mainHandler,
  102. }
  103. go server.ListenAndServe()
  104. eventLoop:
  105. for {
  106. select {
  107. case <-ctx.Done():
  108. break eventLoop
  109. case msg := <-chLoc:
  110. service.LocationToBeaconService(msg, db, alertWriter, ctx)
  111. case msg := <-chEvents:
  112. fmt.Printf("event: %+v\n", msg)
  113. id := msg.ID
  114. if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil {
  115. fmt.Printf("Decoder event for untracked beacon: %s\n", id)
  116. continue
  117. }
  118. if err := db.Updates(&model.Tracker{ID: id, Battery: msg.Battery}).Error; err != nil {
  119. fmt.Printf("Error in saving decoder event for beacon: %s\n", id)
  120. continue
  121. }
  122. }
  123. }
  124. if err := server.Shutdown(context.Background()); err != nil {
  125. eMsg := fmt.Sprintf("could not shutdown: %v\n", err)
  126. slog.Error(eMsg)
  127. }
  128. slog.Info("API SERVER: \n")
  129. slog.Warn("broken out of the main event loop and HTTP server shutdown\n")
  130. wg.Wait()
  131. slog.Info("All go routines have stopped, Beggining to close Kafka connections\n")
  132. appState.CleanKafkaReaders()
  133. appState.CleanKafkaWriters()
  134. slog.Info("All kafka clients shutdown, starting shutdown of valkey client")
  135. slog.Info("API server shutting down")
  136. logFile.Close()
  137. }
  138. func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc {
  139. return func(w http.ResponseWriter, r *http.Request) {
  140. ws, err := upgrader.Upgrade(w, r, nil)
  141. if err != nil {
  142. if _, ok := err.(websocket.HandshakeError); !ok {
  143. eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err)
  144. slog.Error(eMsg)
  145. }
  146. return
  147. }
  148. wg.Add(2)
  149. go writer(ws, appstate, ctx)
  150. go reader(ws, ctx)
  151. }
  152. }
  153. func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Context) {
  154. pingTicker := time.NewTicker((60 * 9) / 10 * time.Second)
  155. beaconTicker := time.NewTicker(2 * time.Second)
  156. defer func() {
  157. pingTicker.Stop()
  158. beaconTicker.Stop()
  159. ws.Close()
  160. wg.Done()
  161. }()
  162. for {
  163. select {
  164. case <-ctx.Done():
  165. slog.Info("WebSocket writer received shutdown signal.")
  166. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  167. ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  168. return
  169. case <-beaconTicker.C:
  170. beacons := appstate.GetAllHttpResults()
  171. js, err := json.Marshal(beacons)
  172. if err != nil {
  173. js = []byte("error")
  174. }
  175. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  176. if err := ws.WriteMessage(websocket.TextMessage, js); err != nil {
  177. return
  178. }
  179. case <-pingTicker.C:
  180. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  181. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  182. return
  183. }
  184. }
  185. }
  186. }
  187. func reader(ws *websocket.Conn, ctx context.Context) {
  188. defer func() {
  189. ws.Close()
  190. wg.Done()
  191. }()
  192. ws.SetReadLimit(512)
  193. ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second))
  194. ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil })
  195. for {
  196. select {
  197. case <-ctx.Done():
  198. slog.Info("closing ws reader")
  199. return
  200. default:
  201. _, _, err := ws.ReadMessage()
  202. if err != nil {
  203. return
  204. }
  205. }
  206. }
  207. }