|
- package main
-
- import (
- "context"
- "encoding/json"
- "fmt"
- "log"
- "net/http"
- "strings"
- "time"
-
- "github.com/AFASystems/presence/internal/pkg/config"
- "github.com/AFASystems/presence/internal/pkg/kafkaclient"
- "github.com/AFASystems/presence/internal/pkg/model"
- "github.com/gorilla/handlers"
- "github.com/gorilla/mux"
- "github.com/gorilla/websocket"
- "github.com/redis/go-redis/v9"
- "github.com/segmentio/kafka-go"
- )
-
- var upgrader = websocket.Upgrader{
- CheckOrigin: func(r *http.Request) bool { return true },
- }
-
- func main() {
- HttpServer("0.0.0.0:1902")
- }
-
- func HttpServer(addr string) {
- cfg := config.Load()
-
- headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
- originsOk := handlers.AllowedOrigins([]string{"*"})
- methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})
-
- // Kafka writer that relays messages
- writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "apibeacons")
- defer writer.Close()
-
- settingsWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "settings")
- defer settingsWriter.Close()
-
- // Kafka reader for Raw MQTT beacons
- locationReader := kafkaclient.KafkaReader(cfg.KafkaURL, "locevents", "gid-loc-serv")
- defer locationReader.Close()
-
- // Kafka reader for API server updates
- alertsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
- defer alertsReader.Close()
-
- client := redis.NewClient(&redis.Options{
- Addr: "127.0.0.1:6379",
- Password: "",
- })
-
- ctx := context.Background()
-
- // Separate channel for location change?
- chLoc := make(chan model.HTTPLocation, 200)
- chEvents := make(chan model.BeaconEvent, 500)
-
- go kafkaclient.Consume(locationReader, chLoc)
- go kafkaclient.Consume(alertsReader, chEvents)
-
- go func() {
- for {
- select {
- case msg := <-chLoc:
- key := fmt.Sprintf("beacon:%s", msg.ID)
- hashM, err := msg.RedisHashable()
- if err != nil {
- fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
- continue
- }
- err = client.HSet(ctx, key, hashM).Err()
- if err != nil {
- fmt.Println("Error in persisting set in Redis key: ", key)
- continue
- }
- case msg := <-chEvents:
- key := fmt.Sprintf("beacon:%s", msg.ID)
- hashM, err := msg.RedisHashable()
- if err != nil {
- fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
- continue
- }
- err = client.HSet(ctx, key, hashM).Err()
- if err != nil {
- fmt.Println("Error in persisting set in Redis key: ", key)
- continue
- }
- }
- }
- }()
-
- r := mux.NewRouter()
-
- // declare WS clients list | do I need it though? or will locations worker send message
- // to kafka and then only this service (server) is being used for communication with the clients
- clients := make(map[*websocket.Conn]bool)
-
- // Declare broadcast channel
- broadcast := make(chan model.Message)
-
- // For now just add beacon DELETE / GET / POST / PUT methods
- r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE")
- r.HandleFunc("/api/beacons/{beacon_id}", beaconsListHandler(ctx, client)).Methods("GET")
- r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST")
- r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT")
-
- // r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET")
- r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST")
-
- // Handler for WS messages
- // No point in having seperate route for each message type, better to handle different message types in one connection
- // r.HandleFunc("/ws/api/beacons", serveWs(client))
- // r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client))
- r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast))
-
- http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
- }
-
- func beaconsListHandler(ctx context.Context, client *redis.Client) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- vars := mux.Vars(r)
- id := vars["beacon_id"]
- key := fmt.Sprintf("beacon:%s", id)
- beacon, err := client.HGetAll(ctx, key).Result()
- if err != nil {
- res := fmt.Sprintf("Error in getting beacon data (key: %s), error: %v", key, err)
- fmt.Println(res)
- http.Error(w, res, 500)
- }
-
- fmt.Printf("%+v", beacon)
- rData, err := json.Marshal(beacon)
- if err != nil {
- res := fmt.Sprintf("Error in marshaling beacon data (key: %s), error: %v", key, err)
- fmt.Println(res)
- http.Error(w, res, 500)
- }
-
- w.Write(rData)
- }
- }
-
- // Probably define value as interface and then reuse this writer in all of the functions
- func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool {
- valueStr, err := json.Marshal(&value)
- if err != nil {
- fmt.Println("error in encoding: ", err)
- return false
- }
- msg := kafka.Message{
- Value: valueStr,
- }
-
- err = writer.WriteMessages(context.Background(), msg)
- if err != nil {
- fmt.Println("Error in sending kafka message: ")
- return false
- }
-
- return true
- }
-
- func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- vars := mux.Vars(r)
- beaconId := vars["beacon_id"]
- apiUpdate := model.ApiUpdate{
- Method: "DELETE",
- ID: beaconId,
- }
-
- fmt.Println("Sending DELETE message")
-
- flag := sendKafkaMessage(writer, &apiUpdate)
- if !flag {
- fmt.Println("error in sending Kafka message")
- http.Error(w, "Error in sending kafka message", 500)
- return
- }
-
- w.Write([]byte("ok"))
- }
- }
-
- func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- decoder := json.NewDecoder(r.Body)
- var inBeacon model.Beacon
- err := decoder.Decode(&inBeacon)
-
- if err != nil {
- http.Error(w, err.Error(), 400)
- return
- }
-
- fmt.Println("sending POST message")
-
- if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) {
- http.Error(w, "name and beacon_id cannot be blank", 400)
- return
- }
-
- apiUpdate := model.ApiUpdate{
- Method: "POST",
- Beacon: inBeacon,
- }
-
- flag := sendKafkaMessage(writer, &apiUpdate)
- if !flag {
- fmt.Println("error in sending Kafka message")
- http.Error(w, "Error in sending kafka message", 500)
- return
- }
-
- w.Write([]byte("ok"))
- }
- }
-
- func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- decoder := json.NewDecoder(r.Body)
- var inSettings model.SettingsVal
- if err := decoder.Decode(&inSettings); err != nil {
- http.Error(w, err.Error(), 400)
- fmt.Println("Error in decoding Settings body: ", err)
- return
- }
-
- if !settingsCheck(inSettings) {
- http.Error(w, "values must be greater than 0", 400)
- fmt.Println("settings values must be greater than 0")
- return
- }
-
- valueStr, err := json.Marshal(&inSettings)
- if err != nil {
- http.Error(w, "Error in encoding settings", 500)
- fmt.Println("Error in encoding settings: ", err)
- return
- }
-
- msg := kafka.Message{
- Value: valueStr,
- }
-
- if err := writer.WriteMessages(context.Background(), msg); err != nil {
- fmt.Println("error in sending Kafka message")
- http.Error(w, "Error in sending kafka message", 500)
- return
- }
-
- w.Write([]byte("ok"))
- }
- }
-
- func settingsCheck(settings model.SettingsVal) bool {
- if settings.LocationConfidence <= 0 || settings.LastSeenThreshold <= 0 || settings.HASendInterval <= 0 {
- return false
- }
-
- return true
- }
-
- func serveWs(client *redis.Client) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- ws, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- if _, ok := err.(websocket.HandshakeError); !ok {
- log.Println(err)
- }
- return
- }
-
- go writer(ws, client)
- reader(ws)
- }
- }
-
- func writer(ws *websocket.Conn, client *redis.Client) {
- pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
- beaconTicker := time.NewTicker(2 * time.Second)
- defer func() {
- pingTicker.Stop()
- beaconTicker.Stop()
- ws.Close()
- }()
- for {
- select {
- case <-beaconTicker.C:
- httpresults, err := client.Get(context.Background(), "httpresults").Result()
- if err == redis.Nil {
- fmt.Println("no beacons list, starting empty")
- } else if err != nil {
- panic(err)
- } else {
- ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
- if err := ws.WriteMessage(websocket.TextMessage, []byte(httpresults)); err != nil {
- return
- }
- }
- case <-pingTicker.C:
- ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
- if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
- return
- }
- }
- }
- }
-
- func serveLatestBeaconsWs(client *redis.Client) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- ws, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- if _, ok := err.(websocket.HandshakeError); !ok {
- log.Println(err)
- }
- return
- }
-
- go latestBeaconWriter(ws, client)
- reader(ws)
- }
- }
-
- // This and writer can be refactored in one function
- func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) {
- pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
- beaconTicker := time.NewTicker(2 * time.Second)
- defer func() {
- pingTicker.Stop()
- beaconTicker.Stop()
- ws.Close()
- }()
- for {
- select {
- case <-beaconTicker.C:
- latestbeacons, err := client.Get(context.Background(), "latestbeacons").Result()
- if err == redis.Nil {
- fmt.Println("no beacons list, starting empty")
- } else if err != nil {
- panic(err)
- } else {
- ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
- if err := ws.WriteMessage(websocket.TextMessage, []byte(latestbeacons)); err != nil {
- return
- }
- }
- case <-pingTicker.C:
- ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
- if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
- return
- }
- }
- }
- }
-
- func reader(ws *websocket.Conn) {
- defer ws.Close()
- ws.SetReadLimit(512)
- ws.SetReadDeadline(time.Now().Add(60 * time.Second))
- ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)); return nil })
- for {
- _, _, err := ws.ReadMessage()
- if err != nil {
- break
- }
- }
- }
-
- func handleConnections(clients map[*websocket.Conn]bool, broadcast chan model.Message) http.HandlerFunc {
- return func(w http.ResponseWriter, r *http.Request) {
- ws, err := upgrader.Upgrade(w, r, nil)
- if err != nil {
- log.Fatal(err)
- }
- defer ws.Close()
- clients[ws] = true
-
- for {
- var msg model.Message
- err := ws.ReadJSON(&msg)
- if err != nil {
- log.Printf("error: %v", err)
- delete(clients, ws)
- break
- }
- broadcast <- msg
- }
- }
- }
|