您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

429 行
12 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "os/signal"
  9. "strings"
  10. "sync"
  11. "syscall"
  12. "time"
  13. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  14. "github.com/AFASystems/presence/internal/pkg/config"
  15. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  16. "github.com/AFASystems/presence/internal/pkg/model"
  17. "github.com/gorilla/handlers"
  18. "github.com/gorilla/mux"
  19. "github.com/gorilla/websocket"
  20. "github.com/redis/go-redis/v9"
  21. "github.com/segmentio/kafka-go"
  22. )
  23. var upgrader = websocket.Upgrader{
  24. CheckOrigin: func(r *http.Request) bool { return true },
  25. }
  26. var wg sync.WaitGroup
  27. func main() {
  28. cfg := config.Load()
  29. appState := appcontext.NewAppState()
  30. // define context
  31. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  32. defer stop()
  33. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  34. originsOk := handlers.AllowedOrigins([]string{"*"})
  35. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  36. writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
  37. settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
  38. locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
  39. alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
  40. client := appState.AddValkeyClient(cfg.ValkeyURL)
  41. // Separate channel for location change?
  42. chLoc := make(chan model.HTTPLocation, 200)
  43. chEvents := make(chan model.BeaconEvent, 500)
  44. wg.Add(2)
  45. go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
  46. go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)
  47. r := mux.NewRouter()
  48. // declare WS clients list | do I need it though? or will locations worker send message
  49. // to kafka and then only this service (server) is being used for communication with the clients
  50. clients := make(map[*websocket.Conn]bool)
  51. // Declare broadcast channel
  52. broadcast := make(chan model.Message)
  53. // For now just add beacon DELETE / GET / POST / PUT methods
  54. r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE")
  55. r.HandleFunc("/api/beacons", beaconsListHandler(appState)).Methods("GET")
  56. r.HandleFunc("/api/beacons/{beacon_id}", beaconsListSingleHandler(appState)).Methods("GET")
  57. r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST")
  58. r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT")
  59. // r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET")
  60. r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST")
  61. // Handler for WS messages
  62. // No point in having seperate route for each message type, better to handle different message types in one connection
  63. // r.HandleFunc("/ws/api/beacons", serveWs(client))
  64. // r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client))
  65. r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast))
  66. http.ListenAndServe("0.0.0.0:1902", handlers.CORS(originsOk, headersOk, methodsOk)(r))
  67. eventLoop:
  68. for {
  69. select {
  70. case <-ctx.Done():
  71. break eventLoop
  72. case msg := <-chLoc:
  73. beacon, ok := appState.GetBeacon(msg.ID)
  74. if !ok {
  75. appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation})
  76. } else {
  77. beacon.ID = msg.ID
  78. beacon.Location = msg.Location
  79. beacon.Distance = msg.Distance
  80. beacon.LastSeen = msg.LastSeen
  81. beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation
  82. appState.UpdateBeacon(msg.ID, beacon)
  83. }
  84. key := fmt.Sprintf("beacon:%s", msg.ID)
  85. hashM, err := msg.RedisHashable()
  86. if err != nil {
  87. fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
  88. continue
  89. }
  90. if err := client.HSet(ctx, key, hashM).Err(); err != nil {
  91. fmt.Println("Error in persisting set in Redis key: ", key)
  92. continue
  93. }
  94. if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
  95. fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
  96. }
  97. case msg := <-chEvents:
  98. beacon, ok := appState.GetBeacon(msg.ID)
  99. if !ok {
  100. appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event})
  101. } else {
  102. beacon.ID = msg.ID
  103. beacon.BeaconType = msg.Type
  104. beacon.HSBattery = int64(msg.Battery)
  105. beacon.Event = msg.Event
  106. appState.UpdateBeacon(msg.ID, beacon)
  107. }
  108. key := fmt.Sprintf("beacon:%s", msg.ID)
  109. hashM, err := msg.RedisHashable()
  110. if err != nil {
  111. fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
  112. continue
  113. }
  114. if err := client.HSet(ctx, key, hashM).Err(); err != nil {
  115. fmt.Println("Error in persisting set in Redis key: ", key)
  116. continue
  117. }
  118. if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
  119. fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
  120. }
  121. }
  122. }
  123. fmt.Println("broken out of the main event loop")
  124. wg.Wait()
  125. fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
  126. appState.CleanKafkaReaders()
  127. appState.CleanKafkaWriters()
  128. fmt.Println("All kafka clients shutdown, starting shutdown of valkey client")
  129. appState.CleanValkeyClient()
  130. }
  131. func beaconsListSingleHandler(appstate *appcontext.AppState) http.HandlerFunc {
  132. return func(w http.ResponseWriter, r *http.Request) {
  133. vars := mux.Vars(r)
  134. id := vars["beacon_id"]
  135. beacon, ok := appstate.GetBeacon(id)
  136. if !ok {
  137. w.Header().Set("Content-Type", "application/json")
  138. w.WriteHeader(http.StatusNotFound)
  139. json.NewEncoder(w).Encode(map[string]string{"error": "Beacon not found"})
  140. return
  141. }
  142. w.Header().Set("Content-Type", "application/json")
  143. w.WriteHeader(http.StatusOK)
  144. json.NewEncoder(w).Encode(beacon)
  145. }
  146. }
  147. func beaconsListHandler(appstate *appcontext.AppState) http.HandlerFunc {
  148. return func(w http.ResponseWriter, r *http.Request) {
  149. beacons := appstate.GetAllBeacons()
  150. w.Header().Set("Content-Type", "application/json")
  151. w.WriteHeader(http.StatusOK)
  152. json.NewEncoder(w).Encode(beacons)
  153. }
  154. }
  155. // Probably define value as interface and then reuse this writer in all of the functions
  156. func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool {
  157. valueStr, err := json.Marshal(&value)
  158. if err != nil {
  159. fmt.Println("error in encoding: ", err)
  160. return false
  161. }
  162. msg := kafka.Message{
  163. Value: valueStr,
  164. }
  165. err = writer.WriteMessages(context.Background(), msg)
  166. if err != nil {
  167. fmt.Println("Error in sending kafka message: ")
  168. return false
  169. }
  170. return true
  171. }
  172. func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc {
  173. return func(w http.ResponseWriter, r *http.Request) {
  174. vars := mux.Vars(r)
  175. beaconId := vars["beacon_id"]
  176. apiUpdate := model.ApiUpdate{
  177. Method: "DELETE",
  178. ID: beaconId,
  179. }
  180. fmt.Println("Sending DELETE message")
  181. flag := sendKafkaMessage(writer, &apiUpdate)
  182. if !flag {
  183. fmt.Println("error in sending Kafka message")
  184. http.Error(w, "Error in sending kafka message", 500)
  185. return
  186. }
  187. w.Write([]byte("ok"))
  188. }
  189. }
  190. func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc {
  191. return func(w http.ResponseWriter, r *http.Request) {
  192. decoder := json.NewDecoder(r.Body)
  193. var inBeacon model.Beacon
  194. err := decoder.Decode(&inBeacon)
  195. if err != nil {
  196. http.Error(w, err.Error(), 400)
  197. return
  198. }
  199. fmt.Println("sending POST message")
  200. if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) {
  201. http.Error(w, "name and beacon_id cannot be blank", 400)
  202. return
  203. }
  204. apiUpdate := model.ApiUpdate{
  205. Method: "POST",
  206. Beacon: inBeacon,
  207. }
  208. flag := sendKafkaMessage(writer, &apiUpdate)
  209. if !flag {
  210. fmt.Println("error in sending Kafka message")
  211. http.Error(w, "Error in sending kafka message", 500)
  212. return
  213. }
  214. w.Write([]byte("ok"))
  215. }
  216. }
  217. func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc {
  218. return func(w http.ResponseWriter, r *http.Request) {
  219. decoder := json.NewDecoder(r.Body)
  220. var inSettings model.SettingsVal
  221. if err := decoder.Decode(&inSettings); err != nil {
  222. http.Error(w, err.Error(), 400)
  223. fmt.Println("Error in decoding Settings body: ", err)
  224. return
  225. }
  226. if !settingsCheck(inSettings) {
  227. http.Error(w, "values must be greater than 0", 400)
  228. fmt.Println("settings values must be greater than 0")
  229. return
  230. }
  231. valueStr, err := json.Marshal(&inSettings)
  232. if err != nil {
  233. http.Error(w, "Error in encoding settings", 500)
  234. fmt.Println("Error in encoding settings: ", err)
  235. return
  236. }
  237. msg := kafka.Message{
  238. Value: valueStr,
  239. }
  240. if err := writer.WriteMessages(context.Background(), msg); err != nil {
  241. fmt.Println("error in sending Kafka message")
  242. http.Error(w, "Error in sending kafka message", 500)
  243. return
  244. }
  245. w.Write([]byte("ok"))
  246. }
  247. }
  248. func settingsCheck(settings model.SettingsVal) bool {
  249. if settings.LocationConfidence <= 0 || settings.LastSeenThreshold <= 0 || settings.HASendInterval <= 0 {
  250. return false
  251. }
  252. return true
  253. }
  254. func serveWs(client *redis.Client) http.HandlerFunc {
  255. return func(w http.ResponseWriter, r *http.Request) {
  256. ws, err := upgrader.Upgrade(w, r, nil)
  257. if err != nil {
  258. if _, ok := err.(websocket.HandshakeError); !ok {
  259. log.Println(err)
  260. }
  261. return
  262. }
  263. go writer(ws, client)
  264. reader(ws)
  265. }
  266. }
  267. func writer(ws *websocket.Conn, client *redis.Client) {
  268. pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
  269. beaconTicker := time.NewTicker(2 * time.Second)
  270. defer func() {
  271. pingTicker.Stop()
  272. beaconTicker.Stop()
  273. ws.Close()
  274. }()
  275. for {
  276. select {
  277. case <-beaconTicker.C:
  278. httpresults, err := client.Get(context.Background(), "httpresults").Result()
  279. if err == redis.Nil {
  280. fmt.Println("no beacons list, starting empty")
  281. } else if err != nil {
  282. panic(err)
  283. } else {
  284. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  285. if err := ws.WriteMessage(websocket.TextMessage, []byte(httpresults)); err != nil {
  286. return
  287. }
  288. }
  289. case <-pingTicker.C:
  290. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  291. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  292. return
  293. }
  294. }
  295. }
  296. }
  297. func serveLatestBeaconsWs(client *redis.Client) http.HandlerFunc {
  298. return func(w http.ResponseWriter, r *http.Request) {
  299. ws, err := upgrader.Upgrade(w, r, nil)
  300. if err != nil {
  301. if _, ok := err.(websocket.HandshakeError); !ok {
  302. log.Println(err)
  303. }
  304. return
  305. }
  306. go latestBeaconWriter(ws, client)
  307. reader(ws)
  308. }
  309. }
  310. // This and writer can be refactored in one function
  311. func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) {
  312. pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
  313. beaconTicker := time.NewTicker(2 * time.Second)
  314. defer func() {
  315. pingTicker.Stop()
  316. beaconTicker.Stop()
  317. ws.Close()
  318. }()
  319. for {
  320. select {
  321. case <-beaconTicker.C:
  322. latestbeacons, err := client.Get(context.Background(), "latestbeacons").Result()
  323. if err == redis.Nil {
  324. fmt.Println("no beacons list, starting empty")
  325. } else if err != nil {
  326. panic(err)
  327. } else {
  328. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  329. if err := ws.WriteMessage(websocket.TextMessage, []byte(latestbeacons)); err != nil {
  330. return
  331. }
  332. }
  333. case <-pingTicker.C:
  334. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  335. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  336. return
  337. }
  338. }
  339. }
  340. }
  341. func reader(ws *websocket.Conn) {
  342. defer ws.Close()
  343. ws.SetReadLimit(512)
  344. ws.SetReadDeadline(time.Now().Add(60 * time.Second))
  345. ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)); return nil })
  346. for {
  347. _, _, err := ws.ReadMessage()
  348. if err != nil {
  349. break
  350. }
  351. }
  352. }
  353. func handleConnections(clients map[*websocket.Conn]bool, broadcast chan model.Message) http.HandlerFunc {
  354. return func(w http.ResponseWriter, r *http.Request) {
  355. ws, err := upgrader.Upgrade(w, r, nil)
  356. if err != nil {
  357. log.Fatal(err)
  358. }
  359. defer ws.Close()
  360. clients[ws] = true
  361. for {
  362. var msg model.Message
  363. err := ws.ReadJSON(&msg)
  364. if err != nil {
  365. log.Printf("error: %v", err)
  366. delete(clients, ws)
  367. break
  368. }
  369. broadcast <- msg
  370. }
  371. }
  372. }