Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.
 
 
 
 

427 rader
12 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "github.com/AFASystems/presence/internal/pkg/common/appcontext"
  11. "github.com/AFASystems/presence/internal/pkg/config"
  12. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  13. "github.com/AFASystems/presence/internal/pkg/model"
  14. "github.com/gorilla/handlers"
  15. "github.com/gorilla/mux"
  16. "github.com/gorilla/websocket"
  17. "github.com/redis/go-redis/v9"
  18. "github.com/segmentio/kafka-go"
  19. )
  20. var upgrader = websocket.Upgrader{
  21. CheckOrigin: func(r *http.Request) bool { return true },
  22. }
  23. func main() {
  24. HttpServer("0.0.0.0:1902")
  25. }
  26. func HttpServer(addr string) {
  27. cfg := config.Load()
  28. appState := appcontext.NewAppState()
  29. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  30. originsOk := handlers.AllowedOrigins([]string{"*"})
  31. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  32. // Kafka writer that relays messages
  33. writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "apibeacons")
  34. defer writer.Close()
  35. settingsWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "settings")
  36. defer settingsWriter.Close()
  37. // Kafka reader for Raw MQTT beacons
  38. locationReader := kafkaclient.KafkaReader(cfg.KafkaURL, "locevents", "gid-loc-serv")
  39. defer locationReader.Close()
  40. // Kafka reader for API server updates
  41. alertsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
  42. defer alertsReader.Close()
  43. client := redis.NewClient(&redis.Options{
  44. Addr: "127.0.0.1:6379",
  45. Password: "",
  46. })
  47. ctx := context.Background()
  48. // Separate channel for location change?
  49. chLoc := make(chan model.HTTPLocation, 200)
  50. chEvents := make(chan model.BeaconEvent, 500)
  51. go kafkaclient.Consume(locationReader, chLoc)
  52. go kafkaclient.Consume(alertsReader, chEvents)
  53. go func() {
  54. for {
  55. select {
  56. case msg := <-chLoc:
  57. beacon, ok := appState.GetBeacon(msg.ID)
  58. if !ok {
  59. appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation})
  60. } else {
  61. beacon.ID = msg.ID
  62. beacon.Location = msg.Location
  63. beacon.Distance = msg.Distance
  64. beacon.LastSeen = msg.LastSeen
  65. beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation
  66. appState.UpdateBeacon(msg.ID, beacon)
  67. }
  68. key := fmt.Sprintf("beacon:%s", msg.ID)
  69. hashM, err := msg.RedisHashable()
  70. if err != nil {
  71. fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
  72. continue
  73. }
  74. if err := client.HSet(ctx, key, hashM).Err(); err != nil {
  75. fmt.Println("Error in persisting set in Redis key: ", key)
  76. continue
  77. }
  78. if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
  79. fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
  80. }
  81. case msg := <-chEvents:
  82. beacon, ok := appState.GetBeacon(msg.ID)
  83. if !ok {
  84. appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event})
  85. } else {
  86. beacon.ID = msg.ID
  87. beacon.BeaconType = msg.Type
  88. beacon.HSBattery = int64(msg.Battery)
  89. beacon.Event = msg.Event
  90. appState.UpdateBeacon(msg.ID, beacon)
  91. }
  92. key := fmt.Sprintf("beacon:%s", msg.ID)
  93. hashM, err := msg.RedisHashable()
  94. if err != nil {
  95. fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
  96. continue
  97. }
  98. if err := client.HSet(ctx, key, hashM).Err(); err != nil {
  99. fmt.Println("Error in persisting set in Redis key: ", key)
  100. continue
  101. }
  102. if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
  103. fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
  104. }
  105. }
  106. }
  107. }()
  108. r := mux.NewRouter()
  109. // declare WS clients list | do I need it though? or will locations worker send message
  110. // to kafka and then only this service (server) is being used for communication with the clients
  111. clients := make(map[*websocket.Conn]bool)
  112. // Declare broadcast channel
  113. broadcast := make(chan model.Message)
  114. // For now just add beacon DELETE / GET / POST / PUT methods
  115. r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE")
  116. r.HandleFunc("/api/beacons", beaconsListHandler(appState)).Methods("GET")
  117. r.HandleFunc("/api/beacons/{beacon_id}", beaconsListSingleHandler(appState)).Methods("GET")
  118. r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST")
  119. r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT")
  120. // r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET")
  121. r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST")
  122. // Handler for WS messages
  123. // No point in having seperate route for each message type, better to handle different message types in one connection
  124. // r.HandleFunc("/ws/api/beacons", serveWs(client))
  125. // r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client))
  126. r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast))
  127. http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
  128. }
  129. func beaconsListSingleHandler(appstate *appcontext.AppState) http.HandlerFunc {
  130. return func(w http.ResponseWriter, r *http.Request) {
  131. vars := mux.Vars(r)
  132. id := vars["beacon_id"]
  133. beacon, ok := appstate.GetBeacon(id)
  134. if !ok {
  135. w.Header().Set("Content-Type", "application/json")
  136. w.WriteHeader(http.StatusNotFound)
  137. json.NewEncoder(w).Encode(map[string]string{"error": "Beacon not found"})
  138. return
  139. }
  140. w.Header().Set("Content-Type", "application/json")
  141. w.WriteHeader(http.StatusOK)
  142. json.NewEncoder(w).Encode(beacon)
  143. }
  144. }
  145. func beaconsListHandler(appstate *appcontext.AppState) http.HandlerFunc {
  146. return func(w http.ResponseWriter, r *http.Request) {
  147. beacons := appstate.GetAllBeacons()
  148. w.Header().Set("Content-Type", "application/json")
  149. w.WriteHeader(http.StatusOK)
  150. json.NewEncoder(w).Encode(beacons)
  151. }
  152. }
  153. // Probably define value as interface and then reuse this writer in all of the functions
  154. func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool {
  155. valueStr, err := json.Marshal(&value)
  156. if err != nil {
  157. fmt.Println("error in encoding: ", err)
  158. return false
  159. }
  160. msg := kafka.Message{
  161. Value: valueStr,
  162. }
  163. err = writer.WriteMessages(context.Background(), msg)
  164. if err != nil {
  165. fmt.Println("Error in sending kafka message: ")
  166. return false
  167. }
  168. return true
  169. }
  170. func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc {
  171. return func(w http.ResponseWriter, r *http.Request) {
  172. vars := mux.Vars(r)
  173. beaconId := vars["beacon_id"]
  174. apiUpdate := model.ApiUpdate{
  175. Method: "DELETE",
  176. ID: beaconId,
  177. }
  178. fmt.Println("Sending DELETE message")
  179. flag := sendKafkaMessage(writer, &apiUpdate)
  180. if !flag {
  181. fmt.Println("error in sending Kafka message")
  182. http.Error(w, "Error in sending kafka message", 500)
  183. return
  184. }
  185. w.Write([]byte("ok"))
  186. }
  187. }
  188. func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc {
  189. return func(w http.ResponseWriter, r *http.Request) {
  190. decoder := json.NewDecoder(r.Body)
  191. var inBeacon model.Beacon
  192. err := decoder.Decode(&inBeacon)
  193. if err != nil {
  194. http.Error(w, err.Error(), 400)
  195. return
  196. }
  197. fmt.Println("sending POST message")
  198. if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) {
  199. http.Error(w, "name and beacon_id cannot be blank", 400)
  200. return
  201. }
  202. apiUpdate := model.ApiUpdate{
  203. Method: "POST",
  204. Beacon: inBeacon,
  205. }
  206. flag := sendKafkaMessage(writer, &apiUpdate)
  207. if !flag {
  208. fmt.Println("error in sending Kafka message")
  209. http.Error(w, "Error in sending kafka message", 500)
  210. return
  211. }
  212. w.Write([]byte("ok"))
  213. }
  214. }
  215. func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc {
  216. return func(w http.ResponseWriter, r *http.Request) {
  217. decoder := json.NewDecoder(r.Body)
  218. var inSettings model.SettingsVal
  219. if err := decoder.Decode(&inSettings); err != nil {
  220. http.Error(w, err.Error(), 400)
  221. fmt.Println("Error in decoding Settings body: ", err)
  222. return
  223. }
  224. if !settingsCheck(inSettings) {
  225. http.Error(w, "values must be greater than 0", 400)
  226. fmt.Println("settings values must be greater than 0")
  227. return
  228. }
  229. valueStr, err := json.Marshal(&inSettings)
  230. if err != nil {
  231. http.Error(w, "Error in encoding settings", 500)
  232. fmt.Println("Error in encoding settings: ", err)
  233. return
  234. }
  235. msg := kafka.Message{
  236. Value: valueStr,
  237. }
  238. if err := writer.WriteMessages(context.Background(), msg); err != nil {
  239. fmt.Println("error in sending Kafka message")
  240. http.Error(w, "Error in sending kafka message", 500)
  241. return
  242. }
  243. w.Write([]byte("ok"))
  244. }
  245. }
  246. func settingsCheck(settings model.SettingsVal) bool {
  247. if settings.LocationConfidence <= 0 || settings.LastSeenThreshold <= 0 || settings.HASendInterval <= 0 {
  248. return false
  249. }
  250. return true
  251. }
  252. func serveWs(client *redis.Client) http.HandlerFunc {
  253. return func(w http.ResponseWriter, r *http.Request) {
  254. ws, err := upgrader.Upgrade(w, r, nil)
  255. if err != nil {
  256. if _, ok := err.(websocket.HandshakeError); !ok {
  257. log.Println(err)
  258. }
  259. return
  260. }
  261. go writer(ws, client)
  262. reader(ws)
  263. }
  264. }
  265. func writer(ws *websocket.Conn, client *redis.Client) {
  266. pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
  267. beaconTicker := time.NewTicker(2 * time.Second)
  268. defer func() {
  269. pingTicker.Stop()
  270. beaconTicker.Stop()
  271. ws.Close()
  272. }()
  273. for {
  274. select {
  275. case <-beaconTicker.C:
  276. httpresults, err := client.Get(context.Background(), "httpresults").Result()
  277. if err == redis.Nil {
  278. fmt.Println("no beacons list, starting empty")
  279. } else if err != nil {
  280. panic(err)
  281. } else {
  282. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  283. if err := ws.WriteMessage(websocket.TextMessage, []byte(httpresults)); err != nil {
  284. return
  285. }
  286. }
  287. case <-pingTicker.C:
  288. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  289. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  290. return
  291. }
  292. }
  293. }
  294. }
  295. func serveLatestBeaconsWs(client *redis.Client) http.HandlerFunc {
  296. return func(w http.ResponseWriter, r *http.Request) {
  297. ws, err := upgrader.Upgrade(w, r, nil)
  298. if err != nil {
  299. if _, ok := err.(websocket.HandshakeError); !ok {
  300. log.Println(err)
  301. }
  302. return
  303. }
  304. go latestBeaconWriter(ws, client)
  305. reader(ws)
  306. }
  307. }
  308. // This and writer can be refactored in one function
  309. func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) {
  310. pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
  311. beaconTicker := time.NewTicker(2 * time.Second)
  312. defer func() {
  313. pingTicker.Stop()
  314. beaconTicker.Stop()
  315. ws.Close()
  316. }()
  317. for {
  318. select {
  319. case <-beaconTicker.C:
  320. latestbeacons, err := client.Get(context.Background(), "latestbeacons").Result()
  321. if err == redis.Nil {
  322. fmt.Println("no beacons list, starting empty")
  323. } else if err != nil {
  324. panic(err)
  325. } else {
  326. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  327. if err := ws.WriteMessage(websocket.TextMessage, []byte(latestbeacons)); err != nil {
  328. return
  329. }
  330. }
  331. case <-pingTicker.C:
  332. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  333. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  334. return
  335. }
  336. }
  337. }
  338. }
  339. func reader(ws *websocket.Conn) {
  340. defer ws.Close()
  341. ws.SetReadLimit(512)
  342. ws.SetReadDeadline(time.Now().Add(60 * time.Second))
  343. ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)); return nil })
  344. for {
  345. _, _, err := ws.ReadMessage()
  346. if err != nil {
  347. break
  348. }
  349. }
  350. }
  351. func handleConnections(clients map[*websocket.Conn]bool, broadcast chan model.Message) http.HandlerFunc {
  352. return func(w http.ResponseWriter, r *http.Request) {
  353. ws, err := upgrader.Upgrade(w, r, nil)
  354. if err != nil {
  355. log.Fatal(err)
  356. }
  357. defer ws.Close()
  358. clients[ws] = true
  359. for {
  360. var msg model.Message
  361. err := ws.ReadJSON(&msg)
  362. if err != nil {
  363. log.Printf("error: %v", err)
  364. delete(clients, ws)
  365. break
  366. }
  367. broadcast <- msg
  368. }
  369. }
  370. }