Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 
 

351 linhas
9.2 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  11. "github.com/AFASystems/presence/internal/pkg/model"
  12. "github.com/gorilla/handlers"
  13. "github.com/gorilla/mux"
  14. "github.com/gorilla/websocket"
  15. "github.com/redis/go-redis/v9"
  16. "github.com/segmentio/kafka-go"
  17. )
  18. var upgrader = websocket.Upgrader{
  19. CheckOrigin: func(r *http.Request) bool { return true },
  20. }
  21. func main() {
  22. HttpServer("0.0.0.0:1902")
  23. }
  24. func HttpServer(addr string) {
  25. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  26. originsOk := handlers.AllowedOrigins([]string{"*"})
  27. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  28. // Kafka writer that relays messages
  29. writer := kafkaclient.KafkaWriter("kafka:9092", "apibeacons")
  30. defer writer.Close()
  31. settingsWriter := kafkaclient.KafkaWriter("kafka:9092", "settings")
  32. defer settingsWriter.Close()
  33. // Define if maybe ws writer should have more topics
  34. wsWriter := kafkaclient.KafkaWriter("kafka:9092", "wsmessages")
  35. defer wsWriter.Close()
  36. r := mux.NewRouter()
  37. client := redis.NewClient(&redis.Options{
  38. Addr: "valkey:6379",
  39. Password: "",
  40. })
  41. // declare WS clients list | do I need it though? or will locations worker send message
  42. // to kafka and then only this service (server) is being used for communication with the clients
  43. clients := make(map[*websocket.Conn]bool)
  44. // Declare broadcast channel
  45. broadcast := make(chan model.Message)
  46. // For now just add beacon DELETE / GET / POST / PUT methods
  47. r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE")
  48. r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET")
  49. r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST")
  50. r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT")
  51. r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET")
  52. r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST")
  53. // Handler for WS messages
  54. // No point in having seperate route for each message type, better to handle different message types in one connection
  55. r.HandleFunc("/ws/api/beacons", serveWs(client))
  56. r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client))
  57. r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast))
  58. http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
  59. }
  60. // Probably define value as interface and then reuse this writer in all of the functions
  61. func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool {
  62. valueStr, err := json.Marshal(&value)
  63. if err != nil {
  64. fmt.Println("error in encoding: ", err)
  65. return false
  66. }
  67. msg := kafka.Message{
  68. Value: valueStr,
  69. }
  70. err = writer.WriteMessages(context.Background(), msg)
  71. if err != nil {
  72. fmt.Println("Error in sending kafka message: ")
  73. return false
  74. }
  75. return true
  76. }
  77. func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc {
  78. return func(w http.ResponseWriter, r *http.Request) {
  79. vars := mux.Vars(r)
  80. beaconId := vars["beacon_id"]
  81. apiUpdate := model.ApiUpdate{
  82. Method: "DELETE",
  83. ID: beaconId,
  84. }
  85. flag := sendKafkaMessage(writer, &apiUpdate)
  86. if !flag {
  87. fmt.Println("error in sending Kafka message")
  88. http.Error(w, "Error in sending kafka message", 500)
  89. return
  90. }
  91. w.Write([]byte("ok"))
  92. }
  93. }
  94. func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc {
  95. return func(w http.ResponseWriter, r *http.Request) {
  96. decoder := json.NewDecoder(r.Body)
  97. var inBeacon model.Beacon
  98. err := decoder.Decode(&inBeacon)
  99. if err != nil {
  100. http.Error(w, err.Error(), 400)
  101. return
  102. }
  103. if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) {
  104. http.Error(w, "name and beacon_id cannot be blank", 400)
  105. return
  106. }
  107. apiUpdate := model.ApiUpdate{
  108. Method: "POST",
  109. Beacon: inBeacon,
  110. }
  111. flag := sendKafkaMessage(writer, &apiUpdate)
  112. if !flag {
  113. fmt.Println("error in sending Kafka message")
  114. http.Error(w, "Error in sending kafka message", 500)
  115. return
  116. }
  117. w.Write([]byte("ok"))
  118. }
  119. }
  120. func beaconsListHandler(client *redis.Client) http.HandlerFunc {
  121. return func(w http.ResponseWriter, r *http.Request) {
  122. beaconsList, err := client.Get(context.Background(), "beaconsList").Result()
  123. if err == redis.Nil {
  124. fmt.Println("no beacons list, starting empty")
  125. http.Error(w, "list is empty", 500)
  126. } else if err != nil {
  127. http.Error(w, "Internal server error", 500)
  128. panic(err)
  129. } else {
  130. w.Write([]byte(beaconsList))
  131. }
  132. }
  133. }
  134. func settingsListHandler(client *redis.Client) http.HandlerFunc {
  135. return func(w http.ResponseWriter, r *http.Request) {
  136. settings, err := client.Get(context.Background(), "settings").Result()
  137. if err == redis.Nil {
  138. fmt.Println("no settings persisted, starting empty")
  139. http.Error(w, "list is empty", 500)
  140. } else if err != nil {
  141. http.Error(w, "Internal server error", 500)
  142. panic(err)
  143. } else {
  144. w.Write([]byte(settings))
  145. }
  146. }
  147. }
  148. func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc {
  149. return func(w http.ResponseWriter, r *http.Request) {
  150. decoder := json.NewDecoder(r.Body)
  151. var inSettings model.SettingsVal
  152. if err := decoder.Decode(&inSettings); err != nil {
  153. http.Error(w, err.Error(), 400)
  154. fmt.Println("Error in decoding Settings body: ", err)
  155. return
  156. }
  157. if !settingsCheck(inSettings) {
  158. http.Error(w, "values must be greater than 0", 400)
  159. fmt.Println("settings values must be greater than 0")
  160. return
  161. }
  162. valueStr, err := json.Marshal(&inSettings)
  163. if err != nil {
  164. http.Error(w, "Error in encoding settings", 500)
  165. fmt.Println("Error in encoding settings: ", err)
  166. return
  167. }
  168. msg := kafka.Message{
  169. Value: valueStr,
  170. }
  171. if err := writer.WriteMessages(context.Background(), msg); err != nil {
  172. fmt.Println("error in sending Kafka message")
  173. http.Error(w, "Error in sending kafka message", 500)
  174. return
  175. }
  176. w.Write([]byte("ok"))
  177. }
  178. }
  179. func settingsCheck(settings model.SettingsVal) bool {
  180. if settings.Location_confidence <= 0 || settings.Last_seen_threshold <= 0 || settings.HA_send_interval <= 0 {
  181. return false
  182. }
  183. return true
  184. }
  185. func serveWs(client *redis.Client) http.HandlerFunc {
  186. return func(w http.ResponseWriter, r *http.Request) {
  187. ws, err := upgrader.Upgrade(w, r, nil)
  188. if err != nil {
  189. if _, ok := err.(websocket.HandshakeError); !ok {
  190. log.Println(err)
  191. }
  192. return
  193. }
  194. go writer(ws, client)
  195. reader(ws)
  196. }
  197. }
  198. func writer(ws *websocket.Conn, client *redis.Client) {
  199. pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
  200. beaconTicker := time.NewTicker(2 * time.Second)
  201. defer func() {
  202. pingTicker.Stop()
  203. beaconTicker.Stop()
  204. ws.Close()
  205. }()
  206. for {
  207. select {
  208. case <-beaconTicker.C:
  209. httpresults, err := client.Get(context.Background(), "httpresults").Result()
  210. if err == redis.Nil {
  211. fmt.Println("no beacons list, starting empty")
  212. } else if err != nil {
  213. panic(err)
  214. } else {
  215. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  216. if err := ws.WriteMessage(websocket.TextMessage, []byte(httpresults)); err != nil {
  217. return
  218. }
  219. }
  220. case <-pingTicker.C:
  221. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  222. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  223. return
  224. }
  225. }
  226. }
  227. }
  228. func serveLatestBeaconsWs(client *redis.Client) http.HandlerFunc {
  229. return func(w http.ResponseWriter, r *http.Request) {
  230. ws, err := upgrader.Upgrade(w, r, nil)
  231. if err != nil {
  232. if _, ok := err.(websocket.HandshakeError); !ok {
  233. log.Println(err)
  234. }
  235. return
  236. }
  237. go latestBeaconWriter(ws, client)
  238. reader(ws)
  239. }
  240. }
  241. // This and writer can be refactored in one function
  242. func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) {
  243. pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
  244. beaconTicker := time.NewTicker(2 * time.Second)
  245. defer func() {
  246. pingTicker.Stop()
  247. beaconTicker.Stop()
  248. ws.Close()
  249. }()
  250. for {
  251. select {
  252. case <-beaconTicker.C:
  253. latestbeacons, err := client.Get(context.Background(), "latestbeacons").Result()
  254. if err == redis.Nil {
  255. fmt.Println("no beacons list, starting empty")
  256. } else if err != nil {
  257. panic(err)
  258. } else {
  259. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  260. if err := ws.WriteMessage(websocket.TextMessage, []byte(latestbeacons)); err != nil {
  261. return
  262. }
  263. }
  264. case <-pingTicker.C:
  265. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  266. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  267. return
  268. }
  269. }
  270. }
  271. }
  272. func reader(ws *websocket.Conn) {
  273. defer ws.Close()
  274. ws.SetReadLimit(512)
  275. ws.SetReadDeadline(time.Now().Add(60 * time.Second))
  276. ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)); return nil })
  277. for {
  278. _, _, err := ws.ReadMessage()
  279. if err != nil {
  280. break
  281. }
  282. }
  283. }
  284. func handleConnections(clients map[*websocket.Conn]bool, broadcast chan model.Message) http.HandlerFunc {
  285. return func(w http.ResponseWriter, r *http.Request) {
  286. ws, err := upgrader.Upgrade(w, r, nil)
  287. if err != nil {
  288. log.Fatal(err)
  289. }
  290. defer ws.Close()
  291. clients[ws] = true
  292. for {
  293. var msg model.Message
  294. err := ws.ReadJSON(&msg)
  295. if err != nil {
  296. log.Printf("error: %v", err)
  297. delete(clients, ws)
  298. break
  299. }
  300. broadcast <- msg
  301. }
  302. }
  303. }