You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

277 rivejä
8.6 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "log"
  8. "log/slog"
  9. "net/http"
  10. "os"
  11. "os/signal"
  12. "strings"
  13. "sync"
  14. "syscall"
  15. "time"
  16. "github.com/AFASystems/presence/internal/pkg/apiclient"
  17. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  18. "github.com/AFASystems/presence/internal/pkg/config"
  19. "github.com/AFASystems/presence/internal/pkg/controller"
  20. "github.com/AFASystems/presence/internal/pkg/database"
  21. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  22. "github.com/AFASystems/presence/internal/pkg/model"
  23. "github.com/AFASystems/presence/internal/pkg/service"
  24. "github.com/gorilla/handlers"
  25. "github.com/gorilla/mux"
  26. "github.com/gorilla/websocket"
  27. "gorm.io/gorm"
  28. )
  29. var upgrader = websocket.Upgrader{
  30. ReadBufferSize: 1024,
  31. WriteBufferSize: 1024,
  32. }
  33. var _ io.Writer = (*os.File)(nil)
  34. var wg sync.WaitGroup
  35. func main() {
  36. cfg := config.Load()
  37. appState := appcontext.NewAppState()
  38. // Create log file
  39. logFile, err := os.OpenFile("server.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
  40. if err != nil {
  41. log.Fatalf("Failed to open log file: %v\n", err)
  42. }
  43. // shell and log file multiwriter
  44. w := io.MultiWriter(os.Stderr, logFile)
  45. logger := slog.New(slog.NewJSONHandler(w, nil))
  46. slog.SetDefault(logger)
  47. // define context
  48. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  49. defer stop()
  50. db, err := database.Connect(cfg)
  51. if err != nil {
  52. log.Fatalf("Failed to open database connection: %v\n", err)
  53. }
  54. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  55. originsOk := handlers.AllowedOrigins([]string{"*"})
  56. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  57. writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
  58. settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
  59. alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alert")
  60. parserWriter := appState.AddKafkaWriter(cfg.KafkaURL, "parser")
  61. slog.Info("Kafka writers topics: apibeacons, settings initialized")
  62. configFile, err := os.Open("/app/cmd/server/config.json")
  63. if err != nil {
  64. panic(err)
  65. }
  66. b, _ := io.ReadAll(configFile)
  67. var configs []model.Config
  68. json.Unmarshal(b, &configs)
  69. for _, config := range configs {
  70. // persist read configs in database
  71. db.Create(&config)
  72. }
  73. db.Find(&configs)
  74. for _, config := range configs {
  75. kp := model.KafkaParser{
  76. ID: "add",
  77. Config: config,
  78. }
  79. if err := service.SendParserConfig(kp, parserWriter, ctx); err != nil {
  80. fmt.Printf("Unable to send parser config to kafka broker %v\n", err)
  81. }
  82. }
  83. if err := apiclient.UpdateDB(db, ctx, cfg, writer); err != nil {
  84. fmt.Printf("Error in getting token: %v\n", err)
  85. }
  86. locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
  87. alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
  88. slog.Info("Kafka readers topics: locevents, alertbeacons initialized")
  89. chLoc := make(chan model.HTTPLocation, 200)
  90. chEvents := make(chan model.BeaconEvent, 500)
  91. wg.Add(2)
  92. go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
  93. go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)
  94. r := mux.NewRouter()
  95. r.HandleFunc("/api/settings", controller.SettingsListController(appState, ctx)).Methods("GET")
  96. r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, ctx)).Methods("POST")
  97. r.HandleFunc("/reslevis/getGateways", controller.GatewayListController(db)).Methods("GET")
  98. r.HandleFunc("/reslevis/postGateway", controller.GatewayAddController(db)).Methods("POST")
  99. r.HandleFunc("/reslevis/removeGateway/{id}", controller.GatewayDeleteController(db)).Methods("DELETE")
  100. r.HandleFunc("/reslevis/updateGateway/{id}", controller.GatewayUpdateController(db)).Methods("PUT")
  101. r.HandleFunc("/reslevis/getZones", controller.ZoneListController(db)).Methods("GET")
  102. r.HandleFunc("/reslevis/postZone", controller.ZoneAddController(db)).Methods("POST")
  103. r.HandleFunc("/reslevis/removeZone/{id}", controller.ZoneDeleteController(db)).Methods("DELETE")
  104. r.HandleFunc("/reslevis/updateZone", controller.ZoneUpdateController(db)).Methods("PUT")
  105. r.HandleFunc("/reslevis/getTrackerZones", controller.TrackerZoneListController(db)).Methods("GET")
  106. r.HandleFunc("/reslevis/postTrackerZone", controller.TrackerZoneAddController(db)).Methods("POST")
  107. r.HandleFunc("/reslevis/removeTrackerZone/{id}", controller.TrackerZoneDeleteController(db)).Methods("DELETE")
  108. r.HandleFunc("/reslevis/updateTrackerZone", controller.TrackerZoneUpdateController(db)).Methods("PUT")
  109. r.HandleFunc("/reslevis/getTrackers", controller.TrackerList(db)).Methods("GET")
  110. r.HandleFunc("/reslevis/postTracker", controller.TrackerAdd(db, writer, ctx)).Methods("POST")
  111. r.HandleFunc("/reslevis/removeTracker/{id}", controller.TrackerDelete(db, writer, ctx)).Methods("DELETE")
  112. r.HandleFunc("/reslevis/updateTracker", controller.TrackerUpdate(db)).Methods("PUT")
  113. r.HandleFunc("/configs/beacons", controller.ParserListController(db)).Methods("GET")
  114. r.HandleFunc("/configs/beacons", controller.ParserAddController(db, parserWriter, ctx)).Methods("POST")
  115. r.HandleFunc("/configs/beacons/{id}", controller.ParserUpdateController(db, parserWriter, ctx)).Methods("PUT")
  116. r.HandleFunc("/configs/beacons/{id}", controller.ParserDeleteController(db, parserWriter, ctx)).Methods("DELETE")
  117. wsHandler := http.HandlerFunc(serveWs(db, ctx))
  118. restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r)
  119. mainHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  120. if strings.HasPrefix(r.URL.Path, "/api/beacons/ws") {
  121. wsHandler.ServeHTTP(w, r)
  122. return
  123. }
  124. restApiHandler.ServeHTTP(w, r)
  125. })
  126. server := http.Server{
  127. Addr: cfg.HTTPAddr,
  128. Handler: mainHandler,
  129. }
  130. go server.ListenAndServe()
  131. eventLoop:
  132. for {
  133. select {
  134. case <-ctx.Done():
  135. break eventLoop
  136. case msg := <-chLoc:
  137. service.LocationToBeaconService(msg, db, alertWriter, ctx)
  138. case msg := <-chEvents:
  139. fmt.Printf("event: %+v\n", msg)
  140. id := msg.ID
  141. if err := db.First(&model.Tracker{}, "id = ?", id).Error; err != nil {
  142. fmt.Printf("Decoder event for untracked beacon: %s\n", id)
  143. continue
  144. }
  145. if err := db.Updates(&model.Tracker{ID: id, Battery: msg.Battery}).Error; err != nil {
  146. fmt.Printf("Error in saving decoder event for beacon: %s\n", id)
  147. continue
  148. }
  149. }
  150. }
  151. if err := server.Shutdown(context.Background()); err != nil {
  152. eMsg := fmt.Sprintf("could not shutdown: %v\n", err)
  153. slog.Error(eMsg)
  154. }
  155. slog.Info("API SERVER: \n")
  156. slog.Warn("broken out of the main event loop and HTTP server shutdown\n")
  157. wg.Wait()
  158. slog.Info("All go routines have stopped, Beggining to close Kafka connections\n")
  159. appState.CleanKafkaReaders()
  160. appState.CleanKafkaWriters()
  161. slog.Info("All kafka clients shutdown, starting shutdown of valkey client")
  162. slog.Info("API server shutting down")
  163. logFile.Close()
  164. }
  165. func serveWs(db *gorm.DB, ctx context.Context) http.HandlerFunc {
  166. return func(w http.ResponseWriter, r *http.Request) {
  167. ws, err := upgrader.Upgrade(w, r, nil)
  168. if err != nil {
  169. if _, ok := err.(websocket.HandshakeError); !ok {
  170. eMsg := fmt.Sprintf("could not upgrade ws connection: %v\n", err)
  171. slog.Error(eMsg)
  172. }
  173. return
  174. }
  175. wg.Add(2)
  176. go writer(ws, db, ctx)
  177. go reader(ws, ctx)
  178. }
  179. }
  180. func writer(ws *websocket.Conn, db *gorm.DB, ctx context.Context) {
  181. pingTicker := time.NewTicker((60 * 9) / 10 * time.Second)
  182. beaconTicker := time.NewTicker(2 * time.Second)
  183. defer func() {
  184. pingTicker.Stop()
  185. beaconTicker.Stop()
  186. ws.Close()
  187. wg.Done()
  188. }()
  189. for {
  190. select {
  191. case <-ctx.Done():
  192. slog.Info("WebSocket writer received shutdown signal.")
  193. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  194. ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  195. return
  196. case <-beaconTicker.C:
  197. var list []model.Tracker
  198. db.Find(&list)
  199. js, err := json.Marshal(list)
  200. if err != nil {
  201. js = []byte("error")
  202. }
  203. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  204. if err := ws.WriteMessage(websocket.TextMessage, js); err != nil {
  205. return
  206. }
  207. case <-pingTicker.C:
  208. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  209. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  210. return
  211. }
  212. }
  213. }
  214. }
  215. func reader(ws *websocket.Conn, ctx context.Context) {
  216. defer func() {
  217. ws.Close()
  218. wg.Done()
  219. }()
  220. ws.SetReadLimit(512)
  221. ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second))
  222. ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil })
  223. for {
  224. select {
  225. case <-ctx.Done():
  226. slog.Info("closing ws reader")
  227. return
  228. default:
  229. _, _, err := ws.ReadMessage()
  230. if err != nil {
  231. return
  232. }
  233. }
  234. }
  235. }