Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 
 

165 linhas
5.0 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "os/signal"
  9. "sync"
  10. "syscall"
  11. "time"
  12. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  13. "github.com/AFASystems/presence/internal/pkg/config"
  14. "github.com/AFASystems/presence/internal/pkg/controller"
  15. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  16. "github.com/AFASystems/presence/internal/pkg/model"
  17. "github.com/AFASystems/presence/internal/pkg/service"
  18. "github.com/gorilla/handlers"
  19. "github.com/gorilla/mux"
  20. "github.com/gorilla/websocket"
  21. )
  22. var upgrader = websocket.Upgrader{
  23. ReadBufferSize: 1024,
  24. WriteBufferSize: 1024,
  25. }
  26. var wg sync.WaitGroup
  27. func main() {
  28. cfg := config.Load()
  29. appState := appcontext.NewAppState()
  30. // define context
  31. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  32. defer stop()
  33. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  34. originsOk := handlers.AllowedOrigins([]string{"*"})
  35. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  36. writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
  37. settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
  38. locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
  39. alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
  40. client := appState.AddValkeyClient(cfg.ValkeyURL)
  41. // Need Lua script to pull all of the beacons in one go on init
  42. chLoc := make(chan model.HTTPLocation, 200)
  43. chEvents := make(chan model.BeaconEvent, 500)
  44. wg.Add(2)
  45. go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
  46. go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)
  47. r := mux.NewRouter()
  48. // For now just add beacon DELETE / GET / POST / PUT methods
  49. r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx, appState)).Methods("DELETE")
  50. r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET")
  51. r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET")
  52. r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("POST")
  53. r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("PUT")
  54. r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET")
  55. r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST")
  56. r.HandleFunc("/api/beacons/ws", serveWs(appState, ctx))
  57. http.ListenAndServe(cfg.HTTPAddr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
  58. eventLoop:
  59. for {
  60. select {
  61. case <-ctx.Done():
  62. break eventLoop
  63. case msg := <-chLoc:
  64. if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil {
  65. fmt.Printf("Error in writing location change to beacon: %v\n", err)
  66. }
  67. case msg := <-chEvents:
  68. if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil {
  69. fmt.Printf("Error in writing event change to beacon: %v\n", err)
  70. }
  71. }
  72. }
  73. fmt.Println("broken out of the main event loop")
  74. wg.Wait()
  75. fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
  76. appState.CleanKafkaReaders()
  77. appState.CleanKafkaWriters()
  78. fmt.Println("All kafka clients shutdown, starting shutdown of valkey client")
  79. appState.CleanValkeyClient()
  80. }
  81. func serveWs(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc {
  82. return func(w http.ResponseWriter, r *http.Request) {
  83. ws, err := upgrader.Upgrade(w, r, nil)
  84. if err != nil {
  85. if _, ok := err.(websocket.HandshakeError); !ok {
  86. log.Println(err)
  87. }
  88. return
  89. }
  90. wg.Add(1)
  91. go writer(ws, appstate, ctx)
  92. reader(ws)
  93. }
  94. }
  95. func writer(ws *websocket.Conn, appstate *appcontext.AppState, ctx context.Context) {
  96. pingTicker := time.NewTicker((60 * 9) / 10 * time.Second)
  97. beaconTicker := time.NewTicker(2 * time.Second)
  98. defer func() {
  99. pingTicker.Stop()
  100. beaconTicker.Stop()
  101. ws.Close()
  102. wg.Done()
  103. }()
  104. for {
  105. select {
  106. case <-ctx.Done():
  107. log.Println("WebSocket writer received shutdown signal.")
  108. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  109. ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
  110. return
  111. case <-beaconTicker.C:
  112. beacons := appstate.GetAllBeacons()
  113. js, err := json.Marshal(beacons)
  114. if err != nil {
  115. js = []byte("error")
  116. }
  117. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  118. if err := ws.WriteMessage(websocket.TextMessage, js); err != nil {
  119. return
  120. }
  121. case <-pingTicker.C:
  122. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  123. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  124. return
  125. }
  126. }
  127. }
  128. }
  129. func reader(ws *websocket.Conn) {
  130. defer ws.Close()
  131. ws.SetReadLimit(512)
  132. ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second))
  133. ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add((60 * 9) / 10 * time.Second)); return nil })
  134. for {
  135. _, _, err := ws.ReadMessage()
  136. if err != nil {
  137. break
  138. }
  139. }
  140. }