Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.
 
 
 
 

257 rader
8.2 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "log/slog"
  9. "net/http"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "github.com/AFASystems/presence/internal/pkg/apiclient"
  17. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  18. "github.com/AFASystems/presence/internal/pkg/config"
  19. "github.com/AFASystems/presence/internal/pkg/controller"
  20. "github.com/AFASystems/presence/internal/pkg/database"
  21. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  22. "github.com/AFASystems/presence/internal/pkg/model"
  23. "github.com/gorilla/handlers"
  24. "github.com/gorilla/mux"
  25. "github.com/gorilla/websocket"
  26. )
  27. var upgrader = websocket.Upgrader{
  28. ReadBufferSize: 1024,
  29. WriteBufferSize: 1024,
  30. }
  31. var _ io.Writer = (*os.File)(nil)
  32. var wg sync.WaitGroup
  33. func main() {
  34. cfg := config.Load()
  35. appState := appcontext.NewAppState()
  36. // Create log file
  37. logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  38. if err != nil {
  39. log.Fatalf("Failed to open log file: %v\n", err)
  40. }
  41. // shell and log file multiwriter
  42. w := io.MultiWriter(os.Stderr, logFile)
  43. logger := slog.New(slog.NewJSONHandler(w, nil))
  44. slog.SetDefault(logger)
  45. // define context
  46. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  47. defer stop()
  48. db, err := database.Connect(cfg)
  49. if err != nil {
  50. log.Fatalf("Failed to open database connection: %v\n", err)
  51. }
  52. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  53. originsOk := handlers.AllowedOrigins([]string{"*"})
  54. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  55. writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
  56. settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
  57. slog.Info("Kafka writers topics: apibeacons, settings initialized")
  58. if err := apiclient.UpdateDB(db, ctx, cfg, writer); err != nil {
  59. fmt.Printf("Error in getting token: %v\n", err)
  60. }
  61. locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
  62. alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
  63. slog.Info("Kafka readers topics: locevents, alertbeacons initialized")
  64. chLoc := make(chan model.HTTPLocation, 200)
  65. chEvents := make(chan model.BeaconEvent, 500)
  66. wg.Add(2)
  67. go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
  68. go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)
  69. r := mux.NewRouter()
  70. r.HandleFunc("/api/settings", controller.SettingsListController(appState, ctx)).Methods("GET")
  71. r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, ctx)).Methods("POST")
  72. r.HandleFunc("/reslevis/getGateways", controller.GatewayListController(db)).Methods("GET")
  73. r.HandleFunc("/reslevis/postGateway", controller.GatewayAddController(db)).Methods("POST")
  74. r.HandleFunc("/reslevis/removeGateway/{id}", controller.GatewayDeleteController(db)).Methods("DELETE")
  75. r.HandleFunc("/reslevis/updateGateway/{id}", controller.GatewayUpdateController(db)).Methods("PUT")
  76. r.HandleFunc("/reslevis/getZones", controller.ZoneListController(db)).Methods("GET")
  77. r.HandleFunc("/reslevis/postZone", controller.ZoneAddController(db)).Methods("POST")
  78. r.HandleFunc("/reslevis/removeZone/{id}", controller.ZoneDeleteController(db)).Methods("DELETE")
  79. r.HandleFunc("/reslevis/updateZone", controller.ZoneUpdateController(db)).Methods("PUT")
  80. r.HandleFunc("/reslevis/getTrackerZones", controller.TrackerZoneListController(db)).Methods("GET")
  81. r.HandleFunc("/reslevis/postTrackerZone", controller.TrackerZoneAddController(db)).Methods("POST")
  82. r.HandleFunc("/reslevis/removeTrackerZone/{id}", controller.TrackerZoneDeleteController(db)).Methods("DELETE")
  83. r.HandleFunc("/reslevis/updateTrackerZone", controller.TrackerZoneUpdateController(db)).Methods("PUT")
  84. r.HandleFunc("/reslevis/getTrackers", controller.TrackerList(db)).Methods("GET")
  85. r.HandleFunc("/reslevis/postTracker", controller.TrackerAdd(db, writer, ctx)).Methods("POST")
  86. r.HandleFunc("/reslevis/removeTracker/{id}", controller.TrackerDelete(db, writer, ctx)).Methods("DELETE")
  87. r.HandleFunc("/reslevis/updateTracker", controller.TrackerUpdate(db)).Methods("PUT")
  88. wsHandler := http.HandlerFunc(serveWs(appState, ctx))
  89. restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r)
  90. mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  91. if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") {
  92. wsHandler.ServeHTTP(w, r)
  93. return
  94. }
  95. restApiHandler.ServeHTTP(w, r)
  96. })
  97. server := http.Server{
  98. Addr: cfg.HTTPAddr,
  99. Handler: mainHandler,
  100. }
  101. go server.ListenAndServe()
  102. eventLoop:
  103. for {
  104. select {
  105. case <-ctx.Done():
  106. break eventLoop
  107. case msg := <-chLoc:
  108. id := msg.ID
  109. if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil {
  110. fmt.Printf("Location event for untracked beacon: %s\n", id)
  111. continue
  112. }
  113. if err := db.Updates(&model.Tracker{ID: id, Location: msg.Location, Distance: msg.Distance}).Error; err != nil {
  114. fmt.Println("Error in saving distance for beacon: ", err)
  115. continue
  116. }
  117. // if err := service.LocationToBeaconService(msg, appState, ctx); err != nil {
  118. // eMsg := fmt.Sprintf("Error in writing location change to beacon: %v\n", err)
  119. // slog.Error(eMsg)
  120. // }
  121. case msg := <-chEvents:
  122. fmt.Printf("event: %+v\n", msg)
  123. id := msg.ID
  124. if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil {
  125. fmt.Printf("Decoder event for untracked beacon: %s\n", id)
  126. continue
  127. }
  128. if err := db.Updates(&model.Tracker{ID: id, Battery: msg.Battery}).Error; err != nil {
  129. fmt.Printf("Error in saving decoder event for beacon: %s\n", id)
  130. continue
  131. }
  132. // if err := service.EventToBeaconService(msg, appState, ctx); err != nil {
  133. // eMsg := fmt.Sprintf("Error in writing event change to beacon: %v\n", err)
  134. // slog.Error(eMsg)
  135. // }
  136. }
  137. }
  138. if err := server.Shutdown(context.Background()); err != nil {
  139. eMsg := fmt.Sprintf("could not shutdown: %v\n", err)
  140. slog.Error(eMsg)
  141. }
  142. slog.Info("API SERVER: \n")
  143. slog.Warn("broken out of the main event loop and HTTP server shutdown\n")
  144. wg.Wait()
  145. slog.Info("All go routines have stopped, Beggining to close Kafka connections\n")
  146. appState.CleanKafkaReaders()
  147. appState.CleanKafkaWriters()
  148. slog.Info("All kafka clients shutdown, starting shutdown of valkey client")
  149. slog.Info("API server shutting down")
  150. logFile.Close()
  151. }
  152. func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc {
  153. return func(w http.ResponseWriter, r *http.Request) {
  154. ws, err := upgrader.Upgrade(w, r, nil)
  155. if err != nil {
  156. if _, ok := err.(websocket.HandshakeError); !ok {
  157. eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err)
  158. slog.Error(eMsg)
  159. }
  160. return
  161. }
  162. wg.Add(2)
  163. go writer(ws, appstate, ctx)
  164. go reader(ws, ctx)
  165. }
  166. }
  167. func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Context) {
  168. pingTicker := time.NewTicker((60 * 9) / 10 * time.Second)
  169. beaconTicker := time.NewTicker(2 * time.Second)
  170. defer func() {
  171. pingTicker.Stop()
  172. beaconTicker.Stop()
  173. ws.Close()
  174. wg.Done()
  175. }()
  176. for {
  177. select {
  178. case <-ctx.Done():
  179. slog.Info("WebSocket writer received shutdown signal.")
  180. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  181. ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  182. return
  183. case <-beaconTicker.C:
  184. beacons := appstate.GetAllHttpResults()
  185. js, err := json.Marshal(beacons)
  186. if err != nil {
  187. js = []byte("error")
  188. }
  189. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  190. if err := ws.WriteMessage(websocket.TextMessage, js); err != nil {
  191. return
  192. }
  193. case <-pingTicker.C:
  194. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  195. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  196. return
  197. }
  198. }
  199. }
  200. }
  201. func reader(ws *websocket.Conn, ctx context.Context) {
  202. defer func() {
  203. ws.Close()
  204. wg.Done()
  205. }()
  206. ws.SetReadLimit(512)
  207. ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second))
  208. ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil })
  209. for {
  210. select {
  211. case <-ctx.Done():
  212. slog.Info("closing ws reader")
  213. return
  214. default:
  215. _, _, err := ws.ReadMessage()
  216. if err != nil {
  217. return
  218. }
  219. }
  220. }
  221. }