From 979563eec4caaf6f1e36e50b707a001b5f739c30 Mon Sep 17 00:00:00 2001 From: blazSmehov Date: Tue, 2 Dec 2025 15:36:13 +0100 Subject: [PATCH] feat: settings controller, chore: refactor api structure --- cmd/decoder/main.go | 2 +- cmd/location/main.go | 7 +- cmd/server/main.go | 357 +----------------- internal/pkg/common/appcontext/context.go | 14 + internal/pkg/config/config.go | 2 +- internal/pkg/controller/beacons_controller.go | 114 ++++++ .../pkg/controller/settings_controller.go | 79 ++++ internal/pkg/redis/redis.go | 6 + internal/pkg/service/beacon_service.go | 72 ++++ 9 files changed, 308 insertions(+), 345 deletions(-) create mode 100644 internal/pkg/controller/beacons_controller.go create mode 100644 internal/pkg/controller/settings_controller.go create mode 100644 internal/pkg/service/beacon_service.go diff --git a/cmd/decoder/main.go b/cmd/decoder/main.go index f951b5a..d9d80c0 100644 --- a/cmd/decoder/main.go +++ b/cmd/decoder/main.go @@ -37,7 +37,7 @@ func main() { fmt.Println("Decoder initialized, subscribed to Kafka topics") chRaw := make(chan model.BeaconAdvertisement, 2000) - chApi := make(chan model.ApiUpdate, 2000) + chApi := make(chan model.ApiUpdate, 200) wg.Add(2) go kafkaclient.Consume(rawReader, chRaw, ctx, &wg) diff --git a/cmd/location/main.go b/cmd/location/main.go index ebd8195..47cebf3 100644 --- a/cmd/location/main.go +++ b/cmd/location/main.go @@ -30,6 +30,7 @@ func main() { rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc") apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc") + settingsReader := appState.AddKafkaReader(cfg.KafkaURL, "settings", "gid-settings-loc") writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents") @@ -40,10 +41,12 @@ func main() { chRaw := make(chan model.BeaconAdvertisement, 2000) chApi := make(chan model.ApiUpdate, 2000) + chSettings := make(chan model.SettingsVal, 5) - wg.Add(2) + wg.Add(3) go kafkaclient.Consume(rawReader, chRaw, ctx, &wg) go kafkaclient.Consume(apiReader, chApi, ctx, &wg) + go kafkaclient.Consume(settingsReader, chSettings, ctx, &wg) eventLoop: for { @@ -65,6 +68,8 @@ eventLoop: appState.RemoveBeaconFromLookup(id) fmt.Println("Beacon removed from lookup: ", id) } + case msg := <-chSettings: + appState.UpdateSettings(msg) } } diff --git a/cmd/server/main.go b/cmd/server/main.go index 5444e6e..1e0d148 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -2,31 +2,22 @@ package main import ( "context" - "encoding/json" "fmt" - "log" "net/http" "os/signal" - "strings" "sync" "syscall" - "time" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/config" + "github.com/AFASystems/presence/internal/pkg/controller" "github.com/AFASystems/presence/internal/pkg/kafkaclient" "github.com/AFASystems/presence/internal/pkg/model" + "github.com/AFASystems/presence/internal/pkg/service" "github.com/gorilla/handlers" "github.com/gorilla/mux" - "github.com/gorilla/websocket" - "github.com/redis/go-redis/v9" - "github.com/segmentio/kafka-go" ) -var upgrader = websocket.Upgrader{ - CheckOrigin: func(r *http.Request) bool { return true }, -} - var wg sync.WaitGroup func main() { @@ -48,8 +39,8 @@ func main() { alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") client := appState.AddValkeyClient(cfg.ValkeyURL) + // Need Lua script to pull all of the beacons in one go on init - // Separate channel for location change? chLoc := make(chan model.HTTPLocation, 200) chEvents := make(chan model.BeaconEvent, 500) @@ -60,16 +51,16 @@ func main() { r := mux.NewRouter() // For now just add beacon DELETE / GET / POST / PUT methods - r.HandleFunc("/api/beacons/{beacon_id}", beaconsDeleteHandler(writer, ctx)).Methods("DELETE") - r.HandleFunc("/api/beacons", beaconsListController(appState)).Methods("GET") - r.HandleFunc("/api/beacons/{beacon_id}", beaconsListSingleController(appState)).Methods("GET") - r.HandleFunc("/api/beacons", beaconsAddHandler(writer, ctx)).Methods("POST") - r.HandleFunc("/api/beacons", beaconsAddHandler(writer, ctx)).Methods("PUT") + r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsDeleteController(writer, ctx)).Methods("DELETE") + r.HandleFunc("/api/beacons", controller.BeaconsListController(appState)).Methods("GET") + r.HandleFunc("/api/beacons/{beacon_id}", controller.BeaconsListSingleController(appState)).Methods("GET") + r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("POST") + r.HandleFunc("/api/beacons", controller.BeaconsAddController(writer, ctx)).Methods("PUT") - // r.HandleFunc("/api/settings", settingsListHandler(client)).Methods("GET") - r.HandleFunc("/api/settings", settingsEditHandler(settingsWriter)).Methods("POST") + r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET") + r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST") - http.ListenAndServe("0.0.0.0:1902", handlers.CORS(originsOk, headersOk, methodsOk)(r)) + http.ListenAndServe(cfg.HTTPAddr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) eventLoop: for { @@ -77,33 +68,12 @@ eventLoop: case <-ctx.Done(): break eventLoop case msg := <-chLoc: - beacon, ok := appState.GetBeacon(msg.ID) - if !ok { - appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation}) - } else { - beacon.ID = msg.ID - beacon.Location = msg.Location - beacon.Distance = msg.Distance - beacon.LastSeen = msg.LastSeen - beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation - appState.UpdateBeacon(msg.ID, beacon) - } - if err := persistBeaconValkey(msg.ID, msg, client, ctx); err != nil { - fmt.Printf("Error in persisting change location beacon: %v", err) + if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil { + fmt.Printf("Error in writing location change to beacon: %v\n", err) } case msg := <-chEvents: - beacon, ok := appState.GetBeacon(msg.ID) - if !ok { - appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event}) - } else { - beacon.ID = msg.ID - beacon.BeaconType = msg.Type - beacon.HSBattery = int64(msg.Battery) - beacon.Event = msg.Event - appState.UpdateBeacon(msg.ID, beacon) - } - if err := persistBeaconValkey(msg.ID, msg, client, ctx); err != nil { - fmt.Printf("Error in persisting change event beacon: %v", err) + if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil { + fmt.Printf("Error in writing event change to beacon: %v\n", err) } } } @@ -117,300 +87,3 @@ eventLoop: fmt.Println("All kafka clients shutdown, starting shutdown of valkey client") appState.CleanValkeyClient() } - -type RedisHashable interface { - RedisHashable() (map[string]interface{}, error) - model.BeaconEvent | model.HTTPLocation -} - -func persistBeaconValkey[T RedisHashable](id string, msg T, client *redis.Client, ctx context.Context) error { - key := fmt.Sprintf("beacon:%s", id) - hashM, err := msg.RedisHashable() - if err != nil { - fmt.Println("Error in converting location into hashmap for Redis insert: ", err) - return err - } - if err := client.HSet(ctx, key, hashM).Err(); err != nil { - fmt.Println("Error in persisting set in Redis key: ", key) - return err - } - if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { - fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) - return err - } - return nil -} - -// Makes no sense to define service as there is no actual business logic -func beaconsListSingleController(appstate *appcontext.AppState) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - id := vars["beacon_id"] - beacon, ok := appstate.GetBeacon(id) - if !ok { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusNotFound) - json.NewEncoder(w).Encode(map[string]string{"error": "Beacon not found"}) - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(beacon) - } -} - -func beaconsListController(appstate *appcontext.AppState) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - beacons := appstate.GetAllBeacons() - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(beacons) - } -} - -// Probably define value as interface and then reuse this writer in all of the functions -func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate, ctx context.Context) error { - valueStr, err := json.Marshal(&value) - if err != nil { - fmt.Println("error in encoding: ", err) - return err - } - msg := kafka.Message{ - Value: valueStr, - } - - if err := writer.WriteMessages(ctx, msg); err != nil { - fmt.Println("Error in sending kafka message: ", err) - return err - } - - return nil -} - -func beaconsDeleteHandler(writer *kafka.Writer, ctx context.Context) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - beaconId := vars["beacon_id"] - apiUpdate := model.ApiUpdate{ - Method: "DELETE", - ID: beaconId, - } - - fmt.Println("Sending DELETE message") - - if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil { - fmt.Println("error in sending Kafka DELETE message") - http.Error(w, "Error in sending kafka message", 500) - return - } - - w.Write([]byte("ok")) - } -} - -func beaconsAddHandler(writer *kafka.Writer, ctx context.Context) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - decoder := json.NewDecoder(r.Body) - var inBeacon model.Beacon - err := decoder.Decode(&inBeacon) - - if err != nil { - http.Error(w, err.Error(), 400) - return - } - - fmt.Println("sending POST message") - - if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) { - http.Error(w, "name and beacon_id cannot be blank", 400) - return - } - - apiUpdate := model.ApiUpdate{ - Method: "POST", - Beacon: inBeacon, - } - - if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil { - fmt.Println("error in sending Kafka POST message") - http.Error(w, "Error in sending kafka message", 500) - return - } - - w.Write([]byte("ok")) - } -} - -func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - decoder := json.NewDecoder(r.Body) - var inSettings model.SettingsVal - if err := decoder.Decode(&inSettings); err != nil { - http.Error(w, err.Error(), 400) - fmt.Println("Error in decoding Settings body: ", err) - return - } - - if !settingsCheck(inSettings) { - http.Error(w, "values must be greater than 0", 400) - fmt.Println("settings values must be greater than 0") - return - } - - valueStr, err := json.Marshal(&inSettings) - if err != nil { - http.Error(w, "Error in encoding settings", 500) - fmt.Println("Error in encoding settings: ", err) - return - } - - msg := kafka.Message{ - Value: valueStr, - } - - if err := writer.WriteMessages(context.Background(), msg); err != nil { - fmt.Println("error in sending Kafka message") - http.Error(w, "Error in sending kafka message", 500) - return - } - - w.Write([]byte("ok")) - } -} - -func settingsCheck(settings model.SettingsVal) bool { - if settings.LocationConfidence <= 0 || settings.LastSeenThreshold <= 0 || settings.HASendInterval <= 0 { - return false - } - - return true -} - -func serveWs(client *redis.Client) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - if _, ok := err.(websocket.HandshakeError); !ok { - log.Println(err) - } - return - } - - go writer(ws, client) - reader(ws) - } -} - -func writer(ws *websocket.Conn, client *redis.Client) { - pingTicker := time.NewTicker((60 * time.Second * 9) / 10) - beaconTicker := time.NewTicker(2 * time.Second) - defer func() { - pingTicker.Stop() - beaconTicker.Stop() - ws.Close() - }() - for { - select { - case <-beaconTicker.C: - httpresults, err := client.Get(context.Background(), "httpresults").Result() - if err == redis.Nil { - fmt.Println("no beacons list, starting empty") - } else if err != nil { - panic(err) - } else { - ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) - if err := ws.WriteMessage(websocket.TextMessage, []byte(httpresults)); err != nil { - return - } - } - case <-pingTicker.C: - ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) - if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { - return - } - } - } -} - -func serveLatestBeaconsWs(client *redis.Client) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - if _, ok := err.(websocket.HandshakeError); !ok { - log.Println(err) - } - return - } - - go latestBeaconWriter(ws, client) - reader(ws) - } -} - -// This and writer can be refactored in one function -func latestBeaconWriter(ws *websocket.Conn, client *redis.Client) { - pingTicker := time.NewTicker((60 * time.Second * 9) / 10) - beaconTicker := time.NewTicker(2 * time.Second) - defer func() { - pingTicker.Stop() - beaconTicker.Stop() - ws.Close() - }() - for { - select { - case <-beaconTicker.C: - latestbeacons, err := client.Get(context.Background(), "latestbeacons").Result() - if err == redis.Nil { - fmt.Println("no beacons list, starting empty") - } else if err != nil { - panic(err) - } else { - ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) - if err := ws.WriteMessage(websocket.TextMessage, []byte(latestbeacons)); err != nil { - return - } - } - case <-pingTicker.C: - ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) - if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil { - return - } - } - } -} - -func reader(ws *websocket.Conn) { - defer ws.Close() - ws.SetReadLimit(512) - ws.SetReadDeadline(time.Now().Add(60 * time.Second)) - ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)); return nil }) - for { - _, _, err := ws.ReadMessage() - if err != nil { - break - } - } -} - -func handleConnections(clients map[*websocket.Conn]bool, broadcast chan model.Message) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - ws, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Fatal(err) - } - defer ws.Close() - clients[ws] = true - - for { - var msg model.Message - err := ws.ReadJSON(&msg) - if err != nil { - log.Printf("error: %v", err) - delete(clients, ws) - break - } - broadcast <- msg - } - } -} diff --git a/internal/pkg/common/appcontext/context.go b/internal/pkg/common/appcontext/context.go index f626c28..f60774e 100644 --- a/internal/pkg/common/appcontext/context.go +++ b/internal/pkg/common/appcontext/context.go @@ -1,6 +1,8 @@ package appcontext import ( + "context" + "encoding/json" "fmt" "strings" "time" @@ -269,3 +271,15 @@ func (m *AppState) UpdateSettings(newSettings model.SettingsVal) { m.settings.Settings = newSettings } + +func (m *AppState) PersistSettings(client *redis.Client, ctx context.Context) { + d, err := json.Marshal(m.GetSettingsValue()) + if err != nil { + fmt.Printf("Error in marshalling settings: %v", err) + return + } + + if err := client.Set(ctx, "settings", d, 0).Err(); err != nil { + fmt.Printf("Error in persisting settings: %v", err) + } +} diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index f7c5b96..85e69a6 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -25,7 +25,7 @@ func getEnv(key, def string) string { func Load() *Config { return &Config{ - HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:8080"), + HTTPAddr: getEnv("HTTP_HOST_PATH", "0.0.0.0:1902"), WSAddr: getEnv("HTTPWS_HOST_PATH", "0.0.0.0:8088"), MQTTHost: getEnv("MQTT_HOST", "192.168.1.101:1883"), MQTTUser: getEnv("MQTT_USERNAME", "user"), diff --git a/internal/pkg/controller/beacons_controller.go b/internal/pkg/controller/beacons_controller.go new file mode 100644 index 0000000..e092407 --- /dev/null +++ b/internal/pkg/controller/beacons_controller.go @@ -0,0 +1,114 @@ +package controller + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/AFASystems/presence/internal/pkg/common/appcontext" + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/gorilla/mux" + "github.com/segmentio/kafka-go" +) + +func BeaconsListSingleController(appstate *appcontext.AppState) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + id := vars["beacon_id"] + beacon, ok := appstate.GetBeacon(id) + if !ok { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotFound) + json.NewEncoder(w).Encode(map[string]string{"error": "Beacon not found"}) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(beacon) + } +} + +func BeaconsListController(appstate *appcontext.AppState) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + beacons := appstate.GetAllBeacons() + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(beacons) + } +} + +// Probably define value as interface and then reuse this writer in all of the functions +func sendKafkaMessage(writer *kafka.Writer, value *model.ApiUpdate, ctx context.Context) error { + valueStr, err := json.Marshal(&value) + if err != nil { + fmt.Println("error in encoding: ", err) + return err + } + msg := kafka.Message{ + Value: valueStr, + } + + if err := writer.WriteMessages(ctx, msg); err != nil { + fmt.Println("Error in sending kafka message: ", err) + return err + } + + return nil +} + +func BeaconsDeleteController(writer *kafka.Writer, ctx context.Context) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + beaconId := vars["beacon_id"] + apiUpdate := model.ApiUpdate{ + Method: "DELETE", + ID: beaconId, + } + + fmt.Printf("Sending DELETE beacon id: %s message\n", beaconId) + + if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil { + fmt.Println("error in sending Kafka DELETE message") + http.Error(w, "Error in sending kafka message", 500) + return + } + + w.Write([]byte("ok")) + } +} + +func BeaconsAddController(writer *kafka.Writer, ctx context.Context) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + decoder := json.NewDecoder(r.Body) + var inBeacon model.Beacon + err := decoder.Decode(&inBeacon) + + if err != nil { + http.Error(w, err.Error(), 400) + return + } + + if (len(strings.TrimSpace(inBeacon.Name)) == 0) || (len(strings.TrimSpace(inBeacon.ID)) == 0) { + http.Error(w, "name and beacon_id cannot be blank", 400) + return + } + + fmt.Printf("sending POST beacon id: %s message\n", inBeacon.ID) + + apiUpdate := model.ApiUpdate{ + Method: "POST", + Beacon: inBeacon, + } + + if err := sendKafkaMessage(writer, &apiUpdate, ctx); err != nil { + fmt.Println("error in sending Kafka POST message") + http.Error(w, "Error in sending kafka message", 500) + return + } + + w.Write([]byte("ok")) + } +} diff --git a/internal/pkg/controller/settings_controller.go b/internal/pkg/controller/settings_controller.go new file mode 100644 index 0000000..6cd8207 --- /dev/null +++ b/internal/pkg/controller/settings_controller.go @@ -0,0 +1,79 @@ +package controller + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/AFASystems/presence/internal/pkg/common/appcontext" + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/redis/go-redis/v9" + "github.com/segmentio/kafka-go" +) + +func SettingsEditController(writer *kafka.Writer, appstate *appcontext.AppState, client *redis.Client, ctx context.Context) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + decoder := json.NewDecoder(r.Body) + var inSettings model.SettingsVal + if err := decoder.Decode(&inSettings); err != nil { + http.Error(w, err.Error(), 400) + fmt.Println("Error in decoding Settings body: ", err) + return + } + + if !settingsCheck(inSettings) { + http.Error(w, "values must be greater than 0", 400) + fmt.Println("settings values must be greater than 0") + return + } + + valueStr, err := json.Marshal(&inSettings) + if err != nil { + http.Error(w, "Error in encoding settings", 500) + fmt.Println("Error in encoding settings: ", err) + return + } + + msg := kafka.Message{ + Value: valueStr, + } + + if err := writer.WriteMessages(ctx, msg); err != nil { + fmt.Println("error in sending Kafka message") + http.Error(w, "Error in sending kafka message", 500) + return + } + + // if all is OK persist settings + appstate.UpdateSettings(inSettings) + appstate.PersistSettings(client, ctx) + + w.Write([]byte("ok")) + } +} + +func settingsCheck(settings model.SettingsVal) bool { + if settings.LocationConfidence <= 0 || settings.LastSeenThreshold <= 0 || settings.HASendInterval <= 0 { + return false + } + + return true +} + +func SettingsListController(appstate *appcontext.AppState, client *redis.Client, ctx context.Context) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + v, err := client.Get(ctx, "settings").Result() + if err == redis.Nil { + msg := "No list found for key settings, starting empty" + fmt.Println(msg) + http.Error(w, "msg", 500) + } else if err != nil { + msg := fmt.Sprintf("Error in connecting to Redis: %v, key: settings returning empty map\n", err) + fmt.Println(msg) + http.Error(w, msg, 500) + } else { + w.Write([]byte(v)) + } + } +} diff --git a/internal/pkg/redis/redis.go b/internal/pkg/redis/redis.go index edabe7f..9e87fa2 100644 --- a/internal/pkg/redis/redis.go +++ b/internal/pkg/redis/redis.go @@ -8,6 +8,9 @@ import ( "github.com/redis/go-redis/v9" ) +// Get Map from Redis +// +// Deprecated: there is only one map now and we know the type func LoadRedisMap[K comparable, V any, M map[K]V](client *redis.Client, ctx context.Context, key string) M { redisValue, err := client.Get(ctx, key).Result() resMap := make(M) @@ -25,6 +28,9 @@ func LoadRedisMap[K comparable, V any, M map[K]V](client *redis.Client, ctx cont return resMap } +// Set Map in Redis +// +// Deprecated: hashmaps are used now func SaveRedisMap(client *redis.Client, ctx context.Context, key string, data interface{}) { eData, err := json.Marshal(data) if err != nil { diff --git a/internal/pkg/service/beacon_service.go b/internal/pkg/service/beacon_service.go new file mode 100644 index 0000000..690c4e9 --- /dev/null +++ b/internal/pkg/service/beacon_service.go @@ -0,0 +1,72 @@ +package service + +import ( + "context" + "fmt" + + "github.com/AFASystems/presence/internal/pkg/common/appcontext" + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/redis/go-redis/v9" +) + +type RedisHashable interface { + RedisHashable() (map[string]any, error) + model.BeaconEvent | model.HTTPLocation +} + +func persistBeaconValkey[T RedisHashable](id string, msg T, client *redis.Client, ctx context.Context) error { + key := fmt.Sprintf("beacon:%s", id) + hashM, err := msg.RedisHashable() + if err != nil { + fmt.Println("Error in converting location into hashmap for Redis insert: ", err) + return err + } + if err := client.HSet(ctx, key, hashM).Err(); err != nil { + fmt.Println("Error in persisting set in Redis key: ", key) + return err + } + if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { + fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) + return err + } + return nil +} + +func LocationToBeaconService(msg model.HTTPLocation, appState *appcontext.AppState, client *redis.Client, ctx context.Context) error { + id := msg.ID + beacon, ok := appState.GetBeacon(id) + if !ok { + appState.UpdateBeacon(id, model.Beacon{ID: id, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation}) + } else { + beacon.ID = id + beacon.Location = msg.Location + beacon.Distance = msg.Distance + beacon.LastSeen = msg.LastSeen + beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation + appState.UpdateBeacon(id, beacon) + } + if err := persistBeaconValkey(id, msg, client, ctx); err != nil { + return err + } + + return nil +} + +func EventToBeaconService(msg model.BeaconEvent, appState *appcontext.AppState, client *redis.Client, ctx context.Context) error { + id := msg.ID + beacon, ok := appState.GetBeacon(id) + if !ok { + appState.UpdateBeacon(id, model.Beacon{ID: id, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event}) + } else { + beacon.ID = id + beacon.BeaconType = msg.Type + beacon.HSBattery = int64(msg.Battery) + beacon.Event = msg.Event + appState.UpdateBeacon(id, beacon) + } + if err := persistBeaconValkey(id, msg, client, ctx); err != nil { + return err + } + + return nil +}