You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

178 lines
5.3 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "os/signal"
  9. "sync"
  10. "syscall"
  11. "time"
  12. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  13. "github.com/AFASystems/presence/internal/pkg/config"
  14. "github.com/AFASystems/presence/internal/pkg/controller"
  15. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  16. "github.com/AFASystems/presence/internal/pkg/model"
  17. "github.com/AFASystems/presence/internal/pkg/service"
  18. "github.com/gorilla/handlers"
  19. "github.com/gorilla/mux"
  20. "github.com/gorilla/websocket"
  21. )
  22. var upgrader = websocket.Upgrader{
  23. ReadBufferSize: 1024,
  24. WriteBufferSize: 1024,
  25. }
  26. var wg sync.WaitGroup
  27. func main() {
  28. cfg := config.Load()
  29. appState := appcontext.NewAppState()
  30. // define context
  31. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  32. defer stop()
  33. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  34. originsOk := handlers.AllowedOrigins([]string{"*"})
  35. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  36. writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
  37. settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
  38. locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
  39. alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
  40. client := appState.AddValkeyClient(cfg.ValkeyURL)
  41. fmt.Println("Init of kafka writers and readers done")
  42. chLoc := make(chan model.HTTPLocation, 200)
  43. chEvents := make(chan model.BeaconEvent, 500)
  44. wg.Add(2)
  45. go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
  46. go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)
  47. r := mux.NewRouter()
  48. fmt.Println("new print")
  49. fmt.Println("new print")
  50. // For now just add beacon DELETE / GET / POST / PUT methods
  51. r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE")
  52. r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET")
  53. r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET")
  54. r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("POST")
  55. r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("PUT")
  56. r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET")
  57. r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST")
  58. // r.HandleFunc("/api/beacons/ws", serveWs(appState, ctx))
  59. server := http.Server{
  60. Addr: cfg.HTTPAddr,
  61. Handler: handlers.CORS(originsOk, headersOk, methodsOk)(r),
  62. }
  63. go server.ListenAndServe()
  64. eventLoop:
  65. for {
  66. select {
  67. case <-ctx.Done():
  68. break eventLoop
  69. case msg := <-chLoc:
  70. if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil {
  71. fmt.Printf("Error in writing location change to beacon: %v\n", err)
  72. }
  73. case msg := <-chEvents:
  74. if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil {
  75. fmt.Printf("Error in writing event change to beacon: %v\n", err)
  76. }
  77. }
  78. }
  79. if err := server.Shutdown(context.Background()); err != nil {
  80. fmt.Printf("could not shutdown: %v\n", err)
  81. }
  82. fmt.Println("broken out of the main event loop and HTTP server shutdown")
  83. wg.Wait()
  84. fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
  85. appState.CleanKafkaReaders()
  86. appState.CleanKafkaWriters()
  87. fmt.Println("All kafka clients shutdown, starting shutdown of valkey client")
  88. appState.CleanValkeyClient()
  89. }
  90. func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc {
  91. return func(w http.ResponseWriter, r *http.Request) {
  92. ws, err := upgrader.Upgrade(w, r, nil)
  93. if err != nil {
  94. if _, ok := err.(websocket.HandshakeError); !ok {
  95. log.Println(err)
  96. }
  97. return
  98. }
  99. wg.Add(1)
  100. go writer(ws, appstate, ctx)
  101. reader(ws)
  102. }
  103. }
  104. func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Context) {
  105. pingTicker := time.NewTicker((60 * 9) / 10 * time.Second)
  106. beaconTicker := time.NewTicker(2 * time.Second)
  107. defer func() {
  108. pingTicker.Stop()
  109. beaconTicker.Stop()
  110. ws.Close()
  111. wg.Done()
  112. }()
  113. for {
  114. select {
  115. case <-ctx.Done():
  116. log.Println("WebSocket writer received shutdown signal.")
  117. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  118. ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  119. return
  120. case <-beaconTicker.C:
  121. beacons := appstate.GetAllBeacons()
  122. js, err := json.Marshal(beacons)
  123. if err != nil {
  124. js = []byte("error")
  125. }
  126. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  127. if err := ws.WriteMessage(websocket.TextMessage, js); err != nil {
  128. return
  129. }
  130. case <-pingTicker.C:
  131. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  132. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  133. return
  134. }
  135. }
  136. }
  137. }
  138. func reader(ws *websocket.Conn) {
  139. defer func() {
  140. ws.Close()
  141. }()
  142. ws.SetReadLimit(512)
  143. ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second))
  144. ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil })
  145. for {
  146. _, _, err := ws.ReadMessage()
  147. if err != nil {
  148. break
  149. }
  150. }
  151. }