25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

417 lines
12 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "os/signal"
  9. "strings"
  10. "sync"
  11. "syscall"
  12. "time"
  13. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  14. "github.com/AFASystems/presence/internal/pkg/config"
  15. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  16. "github.com/AFASystems/presence/internal/pkg/model"
  17. "github.com/gorilla/handlers"
  18. "github.com/gorilla/mux"
  19. "github.com/gorilla/websocket"
  20. "github.com/redis/go-redis/v9"
  21. "github.com/segmentio/kafka-go"
  22. )
  23. var upgrader = websocket.Upgrader{
  24. CheckOrigin: func(r *http.Request) bool { return true },
  25. }
  26. var wg sync.WaitGroup
  27. func main() {
  28. cfg := config.Load()
  29. appState := appcontext.NewAppState()
  30. // define context
  31. ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
  32. defer stop()
  33. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  34. originsOk := handlers.AllowedOrigins([]string{"*"})
  35. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  36. writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
  37. settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")
  38. locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
  39. alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
  40. client := appState.AddValkeyClient(cfg.ValkeyURL)
  41. // Separate channel for location change?
  42. chLoc := make(chan model.HTTPLocation, 200)
  43. chEvents := make(chan model.BeaconEvent, 500)
  44. wg.Add(2)
  45. go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
  46. go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)
  47. r := mux.NewRouter()
  48. // For now just add beacon DELETE / GET / POST / PUT methods
  49. r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer, ctx)).Methods("DELETE")
  50. r.HandleFunc("/api/beacons", beaconsListController(appState)).Methods("GET")
  51. r.HandleFunc("/api/beacons/{beacon_id}", beaconsListSingleController(appState)).Methods("GET")
  52. r.HandleFunc("/api/beacons", beaconsAddHandler(writer, ctx)).Methods("POST")
  53. r.HandleFunc("/api/beacons", beaconsAddHandler(writer, ctx)).Methods("PUT")
  54. // r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET")
  55. r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST")
  56. http.ListenAndServe("0.0.0.0:1902", handlers.CORS(originsOk, headersOk, methodsOk)(r))
  57. eventLoop:
  58. for {
  59. select {
  60. case <-ctx.Done():
  61. break eventLoop
  62. case msg := <-chLoc:
  63. beacon, ok := appState.GetBeacon(msg.ID)
  64. if !ok {
  65. appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation})
  66. } else {
  67. beacon.ID = msg.ID
  68. beacon.Location = msg.Location
  69. beacon.Distance = msg.Distance
  70. beacon.LastSeen = msg.LastSeen
  71. beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation
  72. appState.UpdateBeacon(msg.ID, beacon)
  73. }
  74. if err := persistBeaconValkey(msg.ID, msg, client, ctx); err != nil {
  75. fmt.Printf("Error in persisting change location beacon: %v", err)
  76. }
  77. case msg := <-chEvents:
  78. beacon, ok := appState.GetBeacon(msg.ID)
  79. if !ok {
  80. appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event})
  81. } else {
  82. beacon.ID = msg.ID
  83. beacon.BeaconType = msg.Type
  84. beacon.HSBattery = int64(msg.Battery)
  85. beacon.Event = msg.Event
  86. appState.UpdateBeacon(msg.ID, beacon)
  87. }
  88. if err := persistBeaconValkey(msg.ID, msg, client, ctx); err != nil {
  89. fmt.Printf("Error in persisting change event beacon: %v", err)
  90. }
  91. }
  92. }
  93. fmt.Println("broken out of the main event loop")
  94. wg.Wait()
  95. fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
  96. appState.CleanKafkaReaders()
  97. appState.CleanKafkaWriters()
  98. fmt.Println("All kafka clients shutdown, starting shutdown of valkey client")
  99. appState.CleanValkeyClient()
  100. }
  101. type RedisHashable interface {
  102. RedisHashable() (map[string]interface{}, error)
  103. model.BeaconEvent | model.HTTPLocation
  104. }
  105. func persistBeaconValkey[T RedisHashable](id string, msg T, client *redis.Client, ctx context.Context) error {
  106. key := fmt.Sprintf("beacon:%s", id)
  107. hashM, err := msg.RedisHashable()
  108. if err != nil {
  109. fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
  110. return err
  111. }
  112. if err := client.HSet(ctx, key, hashM).Err(); err != nil {
  113. fmt.Println("Error in persisting set in Redis key: ", key)
  114. return err
  115. }
  116. if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
  117. fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
  118. return err
  119. }
  120. return nil
  121. }
  122. // Makes no sense to define service as there is no actual business logic
  123. func beaconsListSingleController(appstate *appcontext.AppState) http.HandlerFunc {
  124. return func(w http.ResponseWriter, r *http.Request) {
  125. vars := mux.Vars(r)
  126. id := vars["beacon_id"]
  127. beacon, ok := appstate.GetBeacon(id)
  128. if !ok {
  129. w.Header().Set("Content-Type", "application/json")
  130. w.WriteHeader(http.StatusNotFound)
  131. json.NewEncoder(w).Encode(map[string]string{"error": "Beacon not found"})
  132. return
  133. }
  134. w.Header().Set("Content-Type", "application/json")
  135. w.WriteHeader(http.StatusOK)
  136. json.NewEncoder(w).Encode(beacon)
  137. }
  138. }
  139. func beaconsListController(appstate *appcontext.AppState) http.HandlerFunc {
  140. return func(w http.ResponseWriter, r *http.Request) {
  141. beacons := appstate.GetAllBeacons()
  142. w.Header().Set("Content-Type", "application/json")
  143. w.WriteHeader(http.StatusOK)
  144. json.NewEncoder(w).Encode(beacons)
  145. }
  146. }
  147. // Probably define value as interface and then reuse this writer in all of the functions
  148. func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate, ctx context.Context) error {
  149. valueStr, err := json.Marshal(&value)
  150. if err != nil {
  151. fmt.Println("error in encoding: ", err)
  152. return err
  153. }
  154. msg := kafka.Message{
  155. Value: valueStr,
  156. }
  157. if err := writer.WriteMessages(ctx, msg); err != nil {
  158. fmt.Println("Error in sending kafka message: ", err)
  159. return err
  160. }
  161. return nil
  162. }
  163. func beaconsDeleteHandler(writer *kafka.Writer, ctx context.Context) http.HandlerFunc {
  164. return func(w http.ResponseWriter, r *http.Request) {
  165. vars := mux.Vars(r)
  166. beaconId := vars["beacon_id"]
  167. apiUpdate := model.ApiUpdate{
  168. Method: "DELETE",
  169. ID: beaconId,
  170. }
  171. fmt.Println("Sending DELETE message")
  172. if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil {
  173. fmt.Println("error in sending Kafka DELETE message")
  174. http.Error(w, "Error in sending kafka message", 500)
  175. return
  176. }
  177. w.Write([]byte("ok"))
  178. }
  179. }
  180. func beaconsAddHandler(writer *kafka.Writer, ctx context.Context) http.HandlerFunc {
  181. return func(w http.ResponseWriter, r *http.Request) {
  182. decoder := json.NewDecoder(r.Body)
  183. var inBeacon model.Beacon
  184. err := decoder.Decode(&inBeacon)
  185. if err != nil {
  186. http.Error(w, err.Error(), 400)
  187. return
  188. }
  189. fmt.Println("sending POST message")
  190. if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) {
  191. http.Error(w, "name and beacon_id cannot be blank", 400)
  192. return
  193. }
  194. apiUpdate := model.ApiUpdate{
  195. Method: "POST",
  196. Beacon: inBeacon,
  197. }
  198. if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil {
  199. fmt.Println("error in sending Kafka POST message")
  200. http.Error(w, "Error in sending kafka message", 500)
  201. return
  202. }
  203. w.Write([]byte("ok"))
  204. }
  205. }
  206. func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc {
  207. return func(w http.ResponseWriter, r *http.Request) {
  208. decoder := json.NewDecoder(r.Body)
  209. var inSettings model.SettingsVal
  210. if err := decoder.Decode(&inSettings); err != nil {
  211. http.Error(w, err.Error(), 400)
  212. fmt.Println("Error in decoding Settings body: ", err)
  213. return
  214. }
  215. if !settingsCheck(inSettings) {
  216. http.Error(w, "values must be greater than 0", 400)
  217. fmt.Println("settings values must be greater than 0")
  218. return
  219. }
  220. valueStr, err := json.Marshal(&inSettings)
  221. if err != nil {
  222. http.Error(w, "Error in encoding settings", 500)
  223. fmt.Println("Error in encoding settings: ", err)
  224. return
  225. }
  226. msg := kafka.Message{
  227. Value: valueStr,
  228. }
  229. if err := writer.WriteMessages(context.Background(), msg); err != nil {
  230. fmt.Println("error in sending Kafka message")
  231. http.Error(w, "Error in sending kafka message", 500)
  232. return
  233. }
  234. w.Write([]byte("ok"))
  235. }
  236. }
  237. func settingsCheck(settings model.SettingsVal) bool {
  238. if settings.LocationConfidence <= 0 || settings.LastSeenThreshold <= 0 || settings.HASendInterval <= 0 {
  239. return false
  240. }
  241. return true
  242. }
  243. func serveWs(client *redis.Client) http.HandlerFunc {
  244. return func(w http.ResponseWriter, r *http.Request) {
  245. ws, err := upgrader.Upgrade(w, r, nil)
  246. if err != nil {
  247. if _, ok := err.(websocket.HandshakeError); !ok {
  248. log.Println(err)
  249. }
  250. return
  251. }
  252. go writer(ws, client)
  253. reader(ws)
  254. }
  255. }
  256. func writer(ws *websocket.Conn, client *redis.Client) {
  257. pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
  258. beaconTicker := time.NewTicker(2 * time.Second)
  259. defer func() {
  260. pingTicker.Stop()
  261. beaconTicker.Stop()
  262. ws.Close()
  263. }()
  264. for {
  265. select {
  266. case <-beaconTicker.C:
  267. httpresults, err := client.Get(context.Background(), "httpresults").Result()
  268. if err == redis.Nil {
  269. fmt.Println("no beacons list, starting empty")
  270. } else if err != nil {
  271. panic(err)
  272. } else {
  273. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  274. if err := ws.WriteMessage(websocket.TextMessage, []byte(httpresults)); err != nil {
  275. return
  276. }
  277. }
  278. case <-pingTicker.C:
  279. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  280. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  281. return
  282. }
  283. }
  284. }
  285. }
  286. func serveLatestBeaconsWs(client *redis.Client) http.HandlerFunc {
  287. return func(w http.ResponseWriter, r *http.Request) {
  288. ws, err := upgrader.Upgrade(w, r, nil)
  289. if err != nil {
  290. if _, ok := err.(websocket.HandshakeError); !ok {
  291. log.Println(err)
  292. }
  293. return
  294. }
  295. go latestBeaconWriter(ws, client)
  296. reader(ws)
  297. }
  298. }
  299. // This and writer can be refactored in one function
  300. func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) {
  301. pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
  302. beaconTicker := time.NewTicker(2 * time.Second)
  303. defer func() {
  304. pingTicker.Stop()
  305. beaconTicker.Stop()
  306. ws.Close()
  307. }()
  308. for {
  309. select {
  310. case <-beaconTicker.C:
  311. latestbeacons, err := client.Get(context.Background(), "latestbeacons").Result()
  312. if err == redis.Nil {
  313. fmt.Println("no beacons list, starting empty")
  314. } else if err != nil {
  315. panic(err)
  316. } else {
  317. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  318. if err := ws.WriteMessage(websocket.TextMessage, []byte(latestbeacons)); err != nil {
  319. return
  320. }
  321. }
  322. case <-pingTicker.C:
  323. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  324. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  325. return
  326. }
  327. }
  328. }
  329. }
  330. func reader(ws *websocket.Conn) {
  331. defer ws.Close()
  332. ws.SetReadLimit(512)
  333. ws.SetReadDeadline(time.Now().Add(60 * time.Second))
  334. ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)); return nil })
  335. for {
  336. _, _, err := ws.ReadMessage()
  337. if err != nil {
  338. break
  339. }
  340. }
  341. }
  342. func handleConnections(clients map[*websocket.Conn]bool, broadcast chan model.Message) http.HandlerFunc {
  343. return func(w http.ResponseWriter, r *http.Request) {
  344. ws, err := upgrader.Upgrade(w, r, nil)
  345. if err != nil {
  346. log.Fatal(err)
  347. }
  348. defer ws.Close()
  349. clients[ws] = true
  350. for {
  351. var msg model.Message
  352. err := ws.ReadJSON(&msg)
  353. if err != nil {
  354. log.Printf("error: %v", err)
  355. delete(clients, ws)
  356. break
  357. }
  358. broadcast <- msg
  359. }
  360. }
  361. }