Ver a proveniência

feat: add WS support

chore/proposed-structure
Blaz Smehov há 3 semanas
ascendente
cometimento
d4288edb90
2 ficheiros alterados com 106 adições e 44 eliminações
  1. +101
    -37
      cmd/server/main.go
  2. +5
    -7
      internal/pkg/model/types.go

+ 101
- 37
cmd/server/main.go Ver ficheiro

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"time"
@@ -21,11 +22,6 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

type Message struct {
Type string `json:"type"`
Data interface{} `json:"data"`
}

func main() {
HttpServer("0.0.0.0:1902")
}
@@ -53,6 +49,13 @@ func HttpServer(addr string) {
Password: "",
})

// declare WS clients list | do I need it though? or will locations worker send message
// to kafka and then only this service (server) is being used for communication with the clients
clients := make(map[*websocket.Conn]bool)

// Declare broadcast channel
broadcast := make(chan model.Message)

// For now just add beacon DELETE / GET / POST / PUT methods
r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer)).Methods("DELETE")
r.HandleFunc("/api/beacons", beaconsListHandler(client)).Methods("GET")
@@ -64,7 +67,9 @@ func HttpServer(addr string) {

// Handler for WS messages
// No point in having seperate route for each message type, better to handle different message types in one connection
r.HandleFunc("/ws/api", handleWSApi(wsWriter))
r.HandleFunc("/ws/api/beacons", serveWs(client))
r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client))
r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast))

http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
}
@@ -216,30 +221,42 @@ func settingsCheck(settings model.SettingsVal) bool {
return true
}

func wsWriter(ws *websocket.Conn) {
pingTicker := time.NewTicker(30 * time.Second)
func serveWs(client *redis.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok {
log.Println(err)
}
return
}

go writer(ws, client)
reader(ws)
}
}

func writer(ws *websocket.Conn, client *redis.Client) {
pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
beaconTicker := time.NewTicker(2 * time.Second)
defer func() {
pingTicker.Stop()
beaconTicker.Stop()
ws.Close()
}()

for {
select {
case <-beaconTicker.C:
// What exactly is HTTP ... something to be used in locations
// http_results_lock.RLock()
// js, err := json.Marshal(http_results)
// http_results_lock.RUnlock()

// if err != nil {
// js = []byte("error")
// }

ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := ws.WriteMessage(websocket.TextMessage, []byte{1}); err != nil {
return
httpresults, err := client.Get(context.Background(), "httpresults").Result()
if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
panic(err)
} else {
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := ws.WriteMessage(websocket.TextMessage, []byte(httpresults)); err != nil {
return
}
}
case <-pingTicker.C:
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
@@ -250,37 +267,84 @@ func wsWriter(ws *websocket.Conn) {
}
}

func handleWSApi(writer *kafka.Writer) http.HandlerFunc {
func serveLatestBeaconsWs(client *redis.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
if _, ok := err.(websocket.HandshakeError); !ok {
fmt.Println("Error in upgrading connection: ", err)
log.Println(err)
}
return
}
defer ws.Close()

go wsWriter(ws)
go latestBeaconWriter(ws, client)
reader(ws)
}
}

for {
_, msgBytes, err := ws.ReadMessage()
if err != nil {
fmt.Println("Connection closed: ", err)
break
// This and writer can be refactored in one function
func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) {
pingTicker := time.NewTicker((60 * time.Second * 9) / 10)
beaconTicker := time.NewTicker(2 * time.Second)
defer func() {
pingTicker.Stop()
beaconTicker.Stop()
ws.Close()
}()
for {
select {
case <-beaconTicker.C:
latestbeacons, err := client.Get(context.Background(), "latestbeacons").Result()
if err == redis.Nil {
fmt.Println("no beacons list, starting empty")
} else if err != nil {
panic(err)
} else {
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := ws.WriteMessage(websocket.TextMessage, []byte(latestbeacons)); err != nil {
return
}
}

// What are the WS messages even?
msg := kafka.Message{
Value: msgBytes,
case <-pingTicker.C:
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
return
}
}
}
}

func reader(ws *websocket.Conn) {
defer ws.Close()
ws.SetReadLimit(512)
ws.SetReadDeadline(time.Now().Add(60 * time.Second))
ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)); return nil })
for {
_, _, err := ws.ReadMessage()
if err != nil {
break
}
}
}

fmt.Println("recieved WS message: ", msgBytes)
func handleConnections(clients map[*websocket.Conn]bool, broadcast chan model.Message) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Fatal(err)
}
defer ws.Close()
clients[ws] = true

if err := writer.WriteMessages(context.Background(), msg); err != nil {
fmt.Println("Error in sending WS Kafka message: ", err)
for {
var msg model.Message
err := ws.ReadJSON(&msg)
if err != nil {
log.Printf("error: %v", err)
delete(clients, ws)
break
}
broadcast <- msg
}
}
}

+ 5
- 7
internal/pkg/model/types.go Ver ficheiro

@@ -104,12 +104,6 @@ type HAMessage struct {
Distance float64 `json:"distance"`
}

// HTTPLocationsList aggregates all beacon HTTP states.
type HTTPLocationsList struct {
Beacons []HTTPLocation `json:"beacons"`
//Buttons []Button `json:"buttons"`
}

// Beacon holds all relevant information about a tracked beacon device.
type Beacon struct {
Name string `json:"name"`
@@ -182,6 +176,10 @@ type LatestBeaconsList struct {
Lock sync.RWMutex
}

type HTTPLocationsList struct {
Beacons []HTTPLocation `json:"beacons"`
}

type HTTPResultsList struct {
HTTPResultsLock sync.RWMutex
HTTPResults HTTPLocationsList
@@ -192,10 +190,10 @@ type AppContext struct {
Beacons BeaconsList
ButtonsList map[string]Button
Settings Settings
Clients map[*websocket.Conn]bool
Broadcast chan Message
Locations LocationsList
LatestList LatestBeaconsList
Clients map[*websocket.Conn]bool
}

type ApiUpdate struct {


Carregando…
Cancelar
Guardar