From 7e3f5cb61f5d29460870ab8d141d75f82723de0c Mon Sep 17 00:00:00 2001 From: blazSmehov Date: Fri, 7 Nov 2025 15:37:50 +0100 Subject: [PATCH] feat: add websocket logic to server --- build/docker-compose.yml | 1 + cmd/server/main.go | 89 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/build/docker-compose.yml b/build/docker-compose.yml index ca55e08..f818453 100644 --- a/build/docker-compose.yml +++ b/build/docker-compose.yml @@ -26,6 +26,7 @@ services: - KAFKA_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 - KAFKA_LOG_DIRS=/tmp/kraft-combined-logs - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 + - KAFKA_AUTO_CREATE_TOPICS_ENABLE=true ports: - "127.0.0.1:9092:9092" diff --git a/cmd/server/main.go b/cmd/server/main.go index ada5500..f4627b9 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -6,15 +6,26 @@ import ( "fmt" "net/http" "strings" + "time" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" "github.com/gorilla/handlers" "github.com/gorilla/mux" + "github.com/gorilla/websocket" "github.com/redis/go-redis/v9" "github.com/segmentio/kafka-go" ) +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, +} + +type Message struct { + Type string `json:"type"` + Data interface{} `json:"data"` +} + func main() { HttpServer("0.0.0.0:1902") } @@ -31,6 +42,10 @@ func HttpServer(addr string) { settingsWriter := kafkaclient.KafkaWriter("kafka:9092", "settings") defer settingsWriter.Close() + // Define if maybe ws writer should have more topics + wsWriter := kafkaclient.KafkaWriter("kafka:9092", "wsmessages") + defer wsWriter.Close() + r := mux.NewRouter() client := redis.NewClient(&redis.Options{ @@ -47,9 +62,14 @@ func HttpServer(addr string) { r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET") r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST") + // Handler for WS messages + // No point in having seperate route for each message type, better to handle different message types in one connection + r.HandleFunc("/ws/api", handleWSApi(wsWriter)) + http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) } +// Probably define value as interface and then reuse this writer in all of the functions func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate) bool { valueStr, err := json.Marshal(&value) if err != nil { @@ -195,3 +215,72 @@ func settingsCheck(settings model.SettingsVal) bool { return true } + +func wsWriter(ws *websocket.Conn) { + pingTicker := time.NewTicker(30 * time.Second) + beaconTicker := time.NewTicker(2 * time.Second) + defer func() { + pingTicker.Stop() + beaconTicker.Stop() + ws.Close() + }() + + for { + select { + case <-beaconTicker.C: + // What exactly is HTTP ... something to be used in locations + // http_results_lock.RLock() + // js, err := json.Marshal(http_results) + // http_results_lock.RUnlock() + + // if err != nil { + // js = []byte("error") + // } + + ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := ws.WriteMessage(websocket.TextMessage, []byte{1}); err != nil { + return + } + case <-pingTicker.C: + ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + return + } + } + } +} + +func handleWSApi(writer *kafka.Writer) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + ws, err := upgrader.Upgrade(w, r, nil) + if err != nil { + if _, ok := err.(websocket.HandshakeError); !ok { + fmt.Println("Error in upgrading connection: ", err) + } + return + } + defer ws.Close() + + go wsWriter(ws) + + for { + _, msgBytes, err := ws.ReadMessage() + if err != nil { + fmt.Println("Connection closed: ", err) + break + } + + // What are the WS messages even? + msg := kafka.Message{ + Value: msgBytes, + } + + fmt.Println("recieved WS message: ", msgBytes) + + if err := writer.WriteMessages(context.Background(), msg); err != nil { + fmt.Println("Error in sending WS Kafka message: ", err) + break + } + } + } +}