From d4288edb90a84661b47a9205ccb16196446171d4 Mon Sep 17 00:00:00 2001 From: blazSmehov Date: Sun, 9 Nov 2025 18:55:20 +0100 Subject: [PATCH] feat: add WS support --- cmd/server/main.go | 138 ++++++++++++++++++++++++++---------- internal/pkg/model/types.go | 12 ++-- 2 files changed, 106 insertions(+), 44 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index f4627b9..2e0291d 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "log" "net/http" "strings" "time" @@ -21,11 +22,6 @@ var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } -type Message struct { - Type string `json:"type"` - Data interface{} `json:"data"` -} - func main() { HttpServer("0.0.0.0:1902") } @@ -53,6 +49,13 @@ func HttpServer(addr string) { Password: "", }) + // declare WS clients list | do I need it though? or will locations worker send message + // to kafka and then only this service (server) is being used for communication with the clients + clients := make(map[*websocket.Conn]bool) + + // Declare broadcast channel + broadcast := make(chan model.Message) + // For now just add beacon DELETE / GET / POST / PUT methods r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE") r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET") @@ -64,7 +67,9 @@ func HttpServer(addr string) { // Handler for WS messages // No point in having seperate route for each message type, better to handle different message types in one connection - r.HandleFunc("/ws/api", handleWSApi(wsWriter)) + r.HandleFunc("/ws/api/beacons", serveWs(client)) + r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client)) + r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast)) http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) } @@ -216,30 +221,42 @@ func settingsCheck(settings model.SettingsVal) bool { return true } -func wsWriter(ws *websocket.Conn) { - pingTicker := time.NewTicker(30 * time.Second) +func serveWs(client *redis.Client) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + if _, ok := err.(websocket.HandshakeError); !ok { + log.Println(err) + } + return + } + + go writer(ws, client) + reader(ws) + } +} + +func writer(ws *websocket.Conn, client *redis.Client) { + pingTicker := time.NewTicker((60 * time.Second * 9) / 10) beaconTicker := time.NewTicker(2 * time.Second) defer func() { pingTicker.Stop() beaconTicker.Stop() ws.Close() }() - for { select { case <-beaconTicker.C: - // What exactly is HTTP ... something to be used in locations - // http_results_lock.RLock() - // js, err := json.Marshal(http_results) - // http_results_lock.RUnlock() - - // if err != nil { - // js = []byte("error") - // } - - ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) - if err := ws.WriteMessage(websocket.TextMessage, []byte{1}); err != nil { - return + httpresults, err := client.Get(context.Background(), "httpresults").Result() + if err == redis.Nil { + fmt.Println("no beacons list, starting empty") + } else if err != nil { + panic(err) + } else { + ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := ws.WriteMessage(websocket.TextMessage, []byte(httpresults)); err != nil { + return + } } case <-pingTicker.C: ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) @@ -250,37 +267,84 @@ func wsWriter(ws *websocket.Conn) { } } -func handleWSApi(writer *kafka.Writer) http.HandlerFunc { +func serveLatestBeaconsWs(client *redis.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ws, err := upgrader.Upgrade(w, r, nil) if err != nil { if _, ok := err.(websocket.HandshakeError); !ok { - fmt.Println("Error in upgrading connection: ", err) + log.Println(err) } return } - defer ws.Close() - go wsWriter(ws) + go latestBeaconWriter(ws, client) + reader(ws) + } +} - for { - _, msgBytes, err := ws.ReadMessage() - if err != nil { - fmt.Println("Connection closed: ", err) - break +// This and writer can be refactored in one function +func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) { + pingTicker := time.NewTicker((60 * time.Second * 9) / 10) + beaconTicker := time.NewTicker(2 * time.Second) + defer func() { + pingTicker.Stop() + beaconTicker.Stop() + ws.Close() + }() + for { + select { + case <-beaconTicker.C: + latestbeacons, err := client.Get(context.Background(), "latestbeacons").Result() + if err == redis.Nil { + fmt.Println("no beacons list, starting empty") + } else if err != nil { + panic(err) + } else { + ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := ws.WriteMessage(websocket.TextMessage, []byte(latestbeacons)); err != nil { + return + } } - - // What are the WS messages even? - msg := kafka.Message{ - Value: msgBytes, + case <-pingTicker.C: + ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + return } + } + } +} + +func reader(ws *websocket.Conn) { + defer ws.Close() + ws.SetReadLimit(512) + ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)); return nil }) + for { + _, _, err := ws.ReadMessage() + if err != nil { + break + } + } +} - fmt.Println("recieved WS message: ", msgBytes) +func handleConnections(clients map[*websocket.Conn]bool, broadcast chan model.Message) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Fatal(err) + } + defer ws.Close() + clients[ws] = true - if err := writer.WriteMessages(context.Background(), msg); err != nil { - fmt.Println("Error in sending WS Kafka message: ", err) + for { + var msg model.Message + err := ws.ReadJSON(&msg) + if err != nil { + log.Printf("error: %v", err) + delete(clients, ws) break } + broadcast <- msg } } } diff --git a/internal/pkg/model/types.go b/internal/pkg/model/types.go index e87aa3e..3bf4a90 100644 --- a/internal/pkg/model/types.go +++ b/internal/pkg/model/types.go @@ -104,12 +104,6 @@ type HAMessage struct { Distance float64 `json:"distance"` } -// HTTPLocationsList aggregates all beacon HTTP states. -type HTTPLocationsList struct { - Beacons []HTTPLocation `json:"beacons"` - //Buttons []Button `json:"buttons"` -} - // Beacon holds all relevant information about a tracked beacon device. type Beacon struct { Name string `json:"name"` @@ -182,6 +176,10 @@ type LatestBeaconsList struct { Lock sync.RWMutex } +type HTTPLocationsList struct { + Beacons []HTTPLocation `json:"beacons"` +} + type HTTPResultsList struct { HTTPResultsLock sync.RWMutex HTTPResults HTTPLocationsList @@ -192,10 +190,10 @@ type AppContext struct { Beacons BeaconsList ButtonsList map[string]Button Settings Settings - Clients map[*websocket.Conn]bool Broadcast chan Message Locations LocationsList LatestList LatestBeaconsList + Clients map[*websocket.Conn]bool } type ApiUpdate struct {