From 6a85901ff0537676fcd00854a320016e368de1e9 Mon Sep 17 00:00:00 2001 From: blazSmehov Date: Fri, 7 Nov 2025 10:56:12 +0100 Subject: [PATCH] chore: add lock to settings type, refactor code --- cmd/decoder/main.go | 17 ++++++++++------- cmd/server/main.go | 4 ++-- internal/pkg/model/types.go | 7 ++++++- internal/pkg/mqttclient/beacon.go | 2 +- internal/pkg/mqttclient/location.go | 10 +++++----- internal/pkg/mqttclient/processor.go | 4 ++-- internal/pkg/redis/redis.go | 15 +++++++++++++-- 7 files changed, 39 insertions(+), 20 deletions(-) diff --git a/cmd/decoder/main.go b/cmd/decoder/main.go index e864793..9d4070d 100644 --- a/cmd/decoder/main.go +++ b/cmd/decoder/main.go @@ -37,11 +37,13 @@ func main() { LatestList: make(map[string]model.Beacon), }, Settings: model.Settings{ - Location_confidence: 4, - Last_seen_threshold: 15, - Beacon_metrics_size: 30, - HA_send_interval: 5, - HA_send_changes_only: false, + Settings: model.SettingsVal{ + Location_confidence: 4, + Last_seen_threshold: 15, + Beacon_metrics_size: 30, + HA_send_interval: 5, + HA_send_changes_only: false, + }, }, } @@ -82,13 +84,13 @@ func main() { appCtx.LatestList.LatestList = latestList settings := presenseredis.LoadSettings(client, ctx) - appCtx.Settings = settings + appCtx.Settings.Settings = settings // declare channel for collecting Kafka messages chRaw := make(chan model.Incoming_json, 2000) chApi := make(chan model.ApiUpdate, 2000) chLatest := make(chan model.Incoming_json, 2000) - chSettings := make(chan model.Settings, 10) + chSettings := make(chan model.SettingsVal, 10) go kafkaclient.Consume(rawReader, chRaw) go kafkaclient.Consume(apiReader, chApi) @@ -103,6 +105,7 @@ func main() { for range ticker.C { presenseredis.SaveBeaconsList(&appCtx, client, ctx) presenseredis.SaveLatestList(&appCtx, client, ctx) + presenseredis.SaveSettings(&appCtx, client, ctx) } }() diff --git a/cmd/server/main.go b/cmd/server/main.go index 52e913e..04f2f17 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -156,7 +156,7 @@ func settingsListHandler(client *redis.Client) http.HandlerFunc { func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { decoder := json.NewDecoder(r.Body) - var inSettings model.Settings + var inSettings model.SettingsVal if err := decoder.Decode(&inSettings); err != nil { http.Error(w, err.Error(), 400) fmt.Println("Error in decoding Settings body: ", err) @@ -190,7 +190,7 @@ func settingsEditHandler(writer *kafka.Writer) http.HandlerFunc { } } -func settingsCheck(settings model.Settings) bool { +func settingsCheck(settings model.SettingsVal) bool { if settings.Location_confidence <= 0 || settings.Last_seen_threshold <= 0 || settings.HA_send_interval <= 0 { return false } diff --git a/internal/pkg/model/types.go b/internal/pkg/model/types.go index 26a8007..e87aa3e 100644 --- a/internal/pkg/model/types.go +++ b/internal/pkg/model/types.go @@ -8,7 +8,7 @@ import ( ) // Settings defines configuration parameters for presence detection behavior. -type Settings struct { +type SettingsVal struct { Location_confidence int64 `json:"location_confidence"` Last_seen_threshold int64 `json:"last_seen_threshold"` Beacon_metrics_size int `json:"beacon_metrics_size"` @@ -16,6 +16,11 @@ type Settings struct { HA_send_changes_only bool `json:"ha_send_changes_only"` } +type Settings struct { + Settings SettingsVal + Lock sync.RWMutex +} + // Incoming_json represents the JSON payload received from beacon messages. type Incoming_json struct { Hostname string `json:"hostname"` diff --git a/internal/pkg/mqttclient/beacon.go b/internal/pkg/mqttclient/beacon.go index e43084a..3f69614 100644 --- a/internal/pkg/mqttclient/beacon.go +++ b/internal/pkg/mqttclient/beacon.go @@ -41,7 +41,7 @@ func updateLatestList(incoming model.Incoming_json, now int64, latestList *model } } -func updateBeaconData(beacon *model.Beacon, incoming model.Incoming_json, now int64, cl *client.Client, settings *model.Settings) { +func updateBeaconData(beacon *model.Beacon, incoming model.Incoming_json, now int64, cl *client.Client, settings *model.SettingsVal) { beacon.Incoming_JSON = incoming beacon.Last_seen = now beacon.Beacon_type = incoming.Beacon_type diff --git a/internal/pkg/mqttclient/location.go b/internal/pkg/mqttclient/location.go index 535c927..7101aa0 100644 --- a/internal/pkg/mqttclient/location.go +++ b/internal/pkg/mqttclient/location.go @@ -11,7 +11,7 @@ import ( "github.com/yosssi/gmq/mqtt/client" ) -func getLikelyLocations(settings *model.Settings, ctx *model.AppContext, cl *client.Client) { +func getLikelyLocations(settings *model.SettingsVal, ctx *model.AppContext, cl *client.Client) { ctx.HTTPResults.HTTPResultsLock.Lock() defer ctx.HTTPResults.HTTPResultsLock.Unlock() ctx.HTTPResults.HTTPResults = model.HTTPLocationsList{Beacons: []model.HTTPLocation{}} @@ -41,7 +41,7 @@ func getLikelyLocations(settings *model.Settings, ctx *model.AppContext, cl *cli } } -func isExpired(b *model.Beacon, s *model.Settings) bool { +func isExpired(b *model.Beacon, s *model.SettingsVal) bool { return time.Now().Unix()-b.Beacon_metrics[len(b.Beacon_metrics)-1].Timestamp > s.Last_seen_threshold } @@ -76,7 +76,7 @@ func calculateBestLocation(b *model.Beacon) model.BestLocation { return model.BestLocation{Name: bestName, Distance: last.Distance, Last_seen: last.Timestamp} } -func updateBeaconState(b *model.Beacon, best model.BestLocation, s *model.Settings, ctx *model.AppContext, cl *client.Client) { +func updateBeaconState(b *model.Beacon, best model.BestLocation, s *model.SettingsVal, ctx *model.AppContext, cl *client.Client) { updateLocationHistory(b, best.Name) updateConfidence(b, best.Name, s) @@ -94,7 +94,7 @@ func updateLocationHistory(b *model.Beacon, loc string) { } } -func updateConfidence(b *model.Beacon, loc string, s *model.Settings) { +func updateConfidence(b *model.Beacon, loc string, s *model.SettingsVal) { counts := map[string]int{} for _, l := range b.Location_history { counts[l]++ @@ -117,7 +117,7 @@ func updateConfidence(b *model.Beacon, loc string, s *model.Settings) { } } -func locationChanged(b *model.Beacon, best model.BestLocation, s *model.Settings) bool { +func locationChanged(b *model.Beacon, best model.BestLocation, s *model.SettingsVal) bool { return (b.Location_confidence == s.Location_confidence && b.Previous_confident_location != best.Name) || b.Expired_location == "expired" diff --git a/internal/pkg/mqttclient/processor.go b/internal/pkg/mqttclient/processor.go index 41d311f..eedb5f1 100644 --- a/internal/pkg/mqttclient/processor.go +++ b/internal/pkg/mqttclient/processor.go @@ -25,7 +25,7 @@ func runProcessor(ticker *time.Ticker, cl *client.Client, ch <-chan model.Incomi for { select { case <-ticker.C: - getLikelyLocations(&ctx.Settings, ctx, cl) + getLikelyLocations(&ctx.Settings.Settings, ctx, cl) case incoming := <-ch: ProcessIncoming(incoming, cl, ctx) } @@ -49,7 +49,7 @@ func ProcessIncoming(incoming model.Incoming_json, cl *client.Client, ctx *model defer beacons.Lock.Unlock() latestList := &ctx.LatestList - settings := &ctx.Settings + settings := &ctx.Settings.Settings beacon, ok := beacons.Beacons[id] if !ok { diff --git a/internal/pkg/redis/redis.go b/internal/pkg/redis/redis.go index fb7e906..5da9ad7 100644 --- a/internal/pkg/redis/redis.go +++ b/internal/pkg/redis/redis.go @@ -39,9 +39,9 @@ func LoadLatestList(client *redis.Client, ctx context.Context) map[string]model. return latestMap } -func LoadSettings(client *redis.Client, ctx context.Context) model.Settings { +func LoadSettings(client *redis.Client, ctx context.Context) model.SettingsVal { redisSettings, err := client.Get(ctx, "settings").Result() - var settings model.Settings + var settings model.SettingsVal if err == redis.Nil { fmt.Println("no beacons list, starting empty") @@ -75,3 +75,14 @@ func SaveLatestList(appCtx *model.AppContext, client *redis.Client, ctx context. fmt.Println("error in saving to redis: ", err) } } + +func SaveSettings(appCtx *model.AppContext, client *redis.Client, ctx context.Context) { + appCtx.LatestList.Lock.Lock() + data, _ := json.Marshal(appCtx.LatestList.LatestList) + appCtx.LatestList.Lock.Unlock() + + err := client.Set(ctx, "latestList", data, 0).Err() + if err != nil { + fmt.Println("error in saving to redis: ", err) + } +}