You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

329 rivejä
8.6 KiB

  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "strings"
  9. "time"
  10. "github.com/AFASystems/presence/internal/pkg/kafkaclient"
  11. "github.com/AFASystems/presence/internal/pkg/model"
  12. "github.com/gorilla/handlers"
  13. "github.com/gorilla/mux"
  14. "github.com/gorilla/websocket"
  15. "github.com/redis/go-redis/v9"
  16. "github.com/segmentio/kafka-go"
  17. )
  18. var upgrader = websocket.Upgrader{
  19. CheckOrigin: func(r *http.Request) bool { return true },
  20. }
  21. func main() {
  22. HttpServer("0.0.0.0:1902")
  23. }
  24. func HttpServer(addr string) {
  25. headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
  26. originsOk := handlers.AllowedOrigins([]string{"*"})
  27. methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
  28. // Kafka writer that relays messages
  29. writer := kafkaclient.KafkaWriter("kafka:9092", "apibeacons")
  30. defer writer.Close()
  31. settingsWriter := kafkaclient.KafkaWriter("kafka:9092", "settings")
  32. defer settingsWriter.Close()
  33. // Define if maybe ws writer should have more topics
  34. wsWriter := kafkaclient.KafkaWriter("kafka:9092", "wsmessages")
  35. defer wsWriter.Close()
  36. r := mux.NewRouter()
  37. // declare WS clients list | do I need it though? or will locations worker send message
  38. // to kafka and then only this service (server) is being used for communication with the clients
  39. clients := make(map[*websocket.Conn]bool)
  40. // Declare broadcast channel
  41. broadcast := make(chan model.Message)
  42. // For now just add beacon DELETE / GET / POST / PUT methods
  43. r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE")
  44. // r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET")
  45. r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST")
  46. r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT")
  47. // r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET")
  48. r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST")
  49. // Handler for WS messages
  50. // No point in having seperate route for each message type, better to handle different message types in one connection
  51. // r.HandleFunc("/ws/api/beacons", serveWs(client))
  52. // r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client))
  53. r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast))
  54. http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
  55. }
  56. // Probably define value as interface and then reuse this writer in all of the functions
  57. func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool {
  58. valueStr, err := json.Marshal(&value)
  59. if err != nil {
  60. fmt.Println("error in encoding: ", err)
  61. return false
  62. }
  63. msg := kafka.Message{
  64. Value: valueStr,
  65. }
  66. err = writer.WriteMessages(context.Background(), msg)
  67. if err != nil {
  68. fmt.Println("Error in sending kafka message: ")
  69. return false
  70. }
  71. return true
  72. }
  73. func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc {
  74. return func(w http.ResponseWriter, r *http.Request) {
  75. vars := mux.Vars(r)
  76. beaconId := vars["beacon_id"]
  77. apiUpdate := model.ApiUpdate{
  78. Method: "DELETE",
  79. ID: beaconId,
  80. }
  81. fmt.Println("Sending DELETE message")
  82. flag := sendKafkaMessage(writer, &apiUpdate)
  83. if !flag {
  84. fmt.Println("error in sending Kafka message")
  85. http.Error(w, "Error in sending kafka message", 500)
  86. return
  87. }
  88. w.Write([]byte("ok"))
  89. }
  90. }
  91. func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc {
  92. return func(w http.ResponseWriter, r *http.Request) {
  93. decoder := json.NewDecoder(r.Body)
  94. var inBeacon model.Beacon
  95. err := decoder.Decode(&inBeacon)
  96. if err != nil {
  97. http.Error(w, err.Error(), 400)
  98. return
  99. }
  100. fmt.Println("sending POST message")
  101. if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.Beacon_id)) == 0) {
  102. http.Error(w, "name and beacon_id cannot be blank", 400)
  103. return
  104. }
  105. apiUpdate := model.ApiUpdate{
  106. Method: "POST",
  107. Beacon: inBeacon,
  108. }
  109. flag := sendKafkaMessage(writer, &apiUpdate)
  110. if !flag {
  111. fmt.Println("error in sending Kafka message")
  112. http.Error(w, "Error in sending kafka message", 500)
  113. return
  114. }
  115. w.Write([]byte("ok"))
  116. }
  117. }
  118. func beaconsListHandler(client *redis.Client) http.HandlerFunc {
  119. return func(w http.ResponseWriter, r *http.Request) {
  120. }
  121. }
  122. func settingsListHandler(client *redis.Client) http.HandlerFunc {
  123. return func(w http.ResponseWriter, r *http.Request) {}
  124. }
  125. func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc {
  126. return func(w http.ResponseWriter, r *http.Request) {
  127. decoder := json.NewDecoder(r.Body)
  128. var inSettings model.SettingsVal
  129. if err := decoder.Decode(&inSettings); err != nil {
  130. http.Error(w, err.Error(), 400)
  131. fmt.Println("Error in decoding Settings body: ", err)
  132. return
  133. }
  134. if !settingsCheck(inSettings) {
  135. http.Error(w, "values must be greater than 0", 400)
  136. fmt.Println("settings values must be greater than 0")
  137. return
  138. }
  139. valueStr, err := json.Marshal(&inSettings)
  140. if err != nil {
  141. http.Error(w, "Error in encoding settings", 500)
  142. fmt.Println("Error in encoding settings: ", err)
  143. return
  144. }
  145. msg := kafka.Message{
  146. Value: valueStr,
  147. }
  148. if err := writer.WriteMessages(context.Background(), msg); err != nil {
  149. fmt.Println("error in sending Kafka message")
  150. http.Error(w, "Error in sending kafka message", 500)
  151. return
  152. }
  153. w.Write([]byte("ok"))
  154. }
  155. }
  156. func settingsCheck(settings model.SettingsVal) bool {
  157. if settings.Location_confidence <= 0 || settings.Last_seen_threshold <= 0 || settings.HA_send_interval <= 0 {
  158. return false
  159. }
  160. return true
  161. }
  162. func serveWs(client *redis.Client) http.HandlerFunc {
  163. return func(w http.ResponseWriter, r *http.Request) {
  164. ws, err := upgrader.Upgrade(w, r, nil)
  165. if err != nil {
  166. if _, ok := err.(websocket.HandshakeError); !ok {
  167. log.Println(err)
  168. }
  169. return
  170. }
  171. go writer(ws, client)
  172. reader(ws)
  173. }
  174. }
  175. func writer(ws *websocket.Conn, client *redis.Client) {
  176. pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
  177. beaconTicker := time.NewTicker(2 * time.Second)
  178. defer func() {
  179. pingTicker.Stop()
  180. beaconTicker.Stop()
  181. ws.Close()
  182. }()
  183. for {
  184. select {
  185. case <-beaconTicker.C:
  186. httpresults, err := client.Get(context.Background(), "httpresults").Result()
  187. if err == redis.Nil {
  188. fmt.Println("no beacons list, starting empty")
  189. } else if err != nil {
  190. panic(err)
  191. } else {
  192. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  193. if err := ws.WriteMessage(websocket.TextMessage, []byte(httpresults)); err != nil {
  194. return
  195. }
  196. }
  197. case <-pingTicker.C:
  198. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  199. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  200. return
  201. }
  202. }
  203. }
  204. }
  205. func serveLatestBeaconsWs(client *redis.Client) http.HandlerFunc {
  206. return func(w http.ResponseWriter, r *http.Request) {
  207. ws, err := upgrader.Upgrade(w, r, nil)
  208. if err != nil {
  209. if _, ok := err.(websocket.HandshakeError); !ok {
  210. log.Println(err)
  211. }
  212. return
  213. }
  214. go latestBeaconWriter(ws, client)
  215. reader(ws)
  216. }
  217. }
  218. // This and writer can be refactored in one function
  219. func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) {
  220. pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
  221. beaconTicker := time.NewTicker(2 * time.Second)
  222. defer func() {
  223. pingTicker.Stop()
  224. beaconTicker.Stop()
  225. ws.Close()
  226. }()
  227. for {
  228. select {
  229. case <-beaconTicker.C:
  230. latestbeacons, err := client.Get(context.Background(), "latestbeacons").Result()
  231. if err == redis.Nil {
  232. fmt.Println("no beacons list, starting empty")
  233. } else if err != nil {
  234. panic(err)
  235. } else {
  236. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  237. if err := ws.WriteMessage(websocket.TextMessage, []byte(latestbeacons)); err != nil {
  238. return
  239. }
  240. }
  241. case <-pingTicker.C:
  242. ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
  243. if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
  244. return
  245. }
  246. }
  247. }
  248. }
  249. func reader(ws *websocket.Conn) {
  250. defer ws.Close()
  251. ws.SetReadLimit(512)
  252. ws.SetReadDeadline(time.Now().Add(60 * time.Second))
  253. ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)); return nil })
  254. for {
  255. _, _, err := ws.ReadMessage()
  256. if err != nil {
  257. break
  258. }
  259. }
  260. }
  261. func handleConnections(clients map[*websocket.Conn]bool, broadcast chan model.Message) http.HandlerFunc {
  262. return func(w http.ResponseWriter, r *http.Request) {
  263. ws, err := upgrader.Upgrade(w, r, nil)
  264. if err != nil {
  265. log.Fatal(err)
  266. }
  267. defer ws.Close()
  268. clients[ws] = true
  269. for {
  270. var msg model.Message
  271. err := ws.ReadJSON(&msg)
  272. if err != nil {
  273. log.Printf("error: %v", err)
  274. delete(clients, ws)
  275. break
  276. }
  277. broadcast <- msg
  278. }
  279. }
  280. }