package main import ( "context" "encoding/json" "fmt" "log" "net/http" "strings" "time" "github.com/AFASystems/presence/internal/pkg/config" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/redis/go-redis/v9" "github.com/segmentio/kafka-go" ) var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } func main() { HttpServer("0.0.0.0:1902") } func HttpServer(addr string) { cfg := config.Load() headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) originsOk := handlers.AllowedOrigins([]string{"*"}) methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) // Kafka writer that relays messages writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "apibeacons") defer writer.Close() settingsWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "settings") defer settingsWriter.Close() // Kafka reader for Raw MQTT beacons locationReader := kafkaclient.KafkaReader(cfg.KafkaURL, "locevents", "gid-loc-serv") defer locationReader.Close() // Kafka reader for API server updates alertsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") defer alertsReader.Close() client := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", Password: "", }) ctx := context.Background() // Separate channel for location change? chLoc := make(chan model.HTTPLocation, 200) chEvents := make(chan model.BeaconEvent, 500) go kafkaclient.Consume(locationReader, chLoc) go kafkaclient.Consume(alertsReader, chEvents) go func() { for { select { case msg := <-chLoc: key := fmt.Sprintf("beacon:%s", msg.ID) hashM, err := msg.RedisHashable() if err != nil { fmt.Println("Error in converting location into hashmap for Redis insert: ", err) continue } err = client.HSet(ctx, key, hashM).Err() if err != nil { fmt.Println("Error in persisting set in Redis key: ", key) continue } case msg := <-chEvents: key := fmt.Sprintf("beacon:%s", msg.ID) hashM, err := msg.RedisHashable() if err != nil { fmt.Println("Error in converting location into hashmap for Redis insert: ", err) continue } err = client.HSet(ctx, key, hashM).Err() if err != nil { fmt.Println("Error in persisting set in Redis key: ", key) continue } } } }() r := mux.NewRouter() // declare WS clients list | do I need it though? or will locations worker send message // to kafka and then only this service (server) is being used for communication with the clients clients := make(map[*websocket.Conn]bool) // Declare broadcast channel broadcast := make(chan model.Message) // For now just add beacon DELETE / GET / POST / PUT methods r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE") r.HandleFunc("/api/beacons/{beacon_id}", beaconsListHandler(ctx, client)).Methods("GET") r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("POST") r.HandleFunc("/api/beacons", beaconsAddHandler(writer)).Methods("PUT") // r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET") r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST") // Handler for WS messages // No point in having seperate route for each message type, better to handle different message types in one connection // r.HandleFunc("/ws/api/beacons", serveWs(client)) // r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client)) r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast)) http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) } func beaconsListHandler(ctx context.Context, client *redis.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id := vars["beacon_id"] key := fmt.Sprintf("beacon:%s", id) beacon, err := client.HGetAll(ctx, key).Result() if err != nil { res := fmt.Sprintf("Error in getting beacon data (key: %s), error: %v", key, err) fmt.Println(res) http.Error(w, res, 500) } fmt.Printf("%+v", beacon) rData, err := json.Marshal(beacon) if err != nil { res := fmt.Sprintf("Error in marshaling beacon data (key: %s), error: %v", key, err) fmt.Println(res) http.Error(w, res, 500) } w.Write(rData) } } // Probably define value as interface and then reuse this writer in all of the functions func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool { valueStr, err := json.Marshal(&value) if err != nil { fmt.Println("error in encoding: ", err) return false } msg := kafka.Message{ Value: valueStr, } err = writer.WriteMessages(context.Background(), msg) if err != nil { fmt.Println("Error in sending kafka message: ") return false } return true } func beaconsDeleteHandler(writer *kafka.Writer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) beaconId := vars["beacon_id"] apiUpdate := model.ApiUpdate{ Method: "DELETE", ID: beaconId, } fmt.Println("Sending DELETE message") flag := sendKafkaMessage(writer, &apiUpdate) if !flag { fmt.Println("error in sending Kafka message") http.Error(w, "Error in sending kafka message", 500) return } w.Write([]byte("ok")) } } func beaconsAddHandler(writer *kafka.Writer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) var inBeacon model.Beacon err := decoder.Decode(&inBeacon) if err != nil { http.Error(w, err.Error(), 400) return } fmt.Println("sending POST message") if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) { http.Error(w, "name and beacon_id cannot be blank", 400) return } apiUpdate := model.ApiUpdate{ Method: "POST", Beacon: inBeacon, } flag := sendKafkaMessage(writer, &apiUpdate) if !flag { fmt.Println("error in sending Kafka message") http.Error(w, "Error in sending kafka message", 500) return } w.Write([]byte("ok")) } } func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) var inSettings model.SettingsVal if err := decoder.Decode(&inSettings); err != nil { http.Error(w, err.Error(), 400) fmt.Println("Error in decoding Settings body: ", err) return } if !settingsCheck(inSettings) { http.Error(w, "values must be greater than 0", 400) fmt.Println("settings values must be greater than 0") return } valueStr, err := json.Marshal(&inSettings) if err != nil { http.Error(w, "Error in encoding settings", 500) fmt.Println("Error in encoding settings: ", err) return } msg := kafka.Message{ Value: valueStr, } if err := writer.WriteMessages(context.Background(), msg); err != nil { fmt.Println("error in sending Kafka message") http.Error(w, "Error in sending kafka message", 500) return } w.Write([]byte("ok")) } } func settingsCheck(settings model.SettingsVal) bool { if settings.LocationConfidence <= 0 || settings.LastSeenThreshold <= 0 || settings.HASendInterval <= 0 { return false } return true } func serveWs(client *redis.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { if _, ok := err.(websocket.HandshakeError); !ok { log.Println(err) } return } go writer(ws, client) reader(ws) } } func writer(ws *websocket.Conn, client *redis.Client) { pingTicker := time.NewTicker((60 * time.Second * 9) / 10) beaconTicker := time.NewTicker(2 * time.Second) defer func() { pingTicker.Stop() beaconTicker.Stop() ws.Close() }() for { select { case <-beaconTicker.C: httpresults, err := client.Get(context.Background(), "httpresults").Result() if err == redis.Nil { fmt.Println("no beacons list, starting empty") } else if err != nil { panic(err) } else { ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.TextMessage, []byte(httpresults)); err != nil { return } } case <-pingTicker.C: ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { return } } } } func serveLatestBeaconsWs(client *redis.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { if _, ok := err.(websocket.HandshakeError); !ok { log.Println(err) } return } go latestBeaconWriter(ws, client) reader(ws) } } // This and writer can be refactored in one function func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) { pingTicker := time.NewTicker((60 * time.Second * 9) / 10) beaconTicker := time.NewTicker(2 * time.Second) defer func() { pingTicker.Stop() beaconTicker.Stop() ws.Close() }() for { select { case <-beaconTicker.C: latestbeacons, err := client.Get(context.Background(), "latestbeacons").Result() if err == redis.Nil { fmt.Println("no beacons list, starting empty") } else if err != nil { panic(err) } else { ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.TextMessage, []byte(latestbeacons)); err != nil { return } } case <-pingTicker.C: ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { return } } } } func reader(ws *websocket.Conn) { defer ws.Close() ws.SetReadLimit(512) ws.SetReadDeadline(time.Now().Add(60 * time.Second)) ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)); return nil }) for { _, _, err := ws.ReadMessage() if err != nil { break } } } func handleConnections(clients map[*websocket.Conn]bool, broadcast chan model.Message) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Fatal(err) } defer ws.Close() clients[ws] = true for { var msg model.Message err := ws.ReadJSON(&msg) if err != nil { log.Printf("error: %v", err) delete(clients, ws) break } broadcast <- msg } } }