Przeglądaj źródła

chore: valkey cleanup

master
Blaz Smehov 1 miesiąc temu
rodzic
commit
59a5eaf365
8 zmienionych plików z 58 dodań i 98 usunięć
  1. +5
    -0
      cmd/db-testbench/.env
  2. +37
    -0
      cmd/db-testbench/main.go
  3. +7
    -0
      cmd/db-testbench/models/book.go
  4. +4
    -9
      cmd/server/main.go
  5. +0
    -35
      internal/pkg/common/appcontext/context.go
  6. +0
    -5
      internal/pkg/config/config.go
  7. +3
    -16
      internal/pkg/controller/settings_controller.go
  8. +2
    -33
      internal/pkg/service/beacon_service.go

+ 5
- 0
cmd/db-testbench/.env Wyświetl plik

@@ -0,0 +1,5 @@
DB_HOST=localhost
DB_PORT=5432
DB_USER=postgres
DB_PASSWORD=postgres
DB_NAME=go_crud_db

+ 37
- 0
cmd/db-testbench/main.go Wyświetl plik

@@ -0,0 +1,37 @@
package main

import (
"fmt"
"log"
"os"

"github.com/joho/godotenv"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)

var DB *gorm.DB

func main() {
err := godotenv.Load()
if err != nil {
log.Fatal("Error loading .env file")
}

dsn := fmt.Sprintf(
"host=%s user=%s password=%s dbname=%s port=%s sslmode=disable",
os.Getenv("DB_HOST"),
os.Getenv("DB_USER"),
os.Getenv("DB_PASSWORD"),
os.Getenv("DB_NAME"),
os.Getenv("DB_PORT"),
)

db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatal("Failed to connect to the database:", err)
}

DB = db
fmt.Println("Database connection established")
}

+ 7
- 0
cmd/db-testbench/models/book.go Wyświetl plik

@@ -0,0 +1,7 @@
package models

type Book struct {
ID uint `json:"id" gorm:"primaryKey"`
Title string `json:"title"`
Author string `json:"author"`
}

+ 4
- 9
cmd/server/main.go Wyświetl plik

@@ -65,9 +65,6 @@ func main() {
alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
slog.Info("Kafka readers topics: locevents, alertbeacons initialized")

client := appState.AddValkeyClient(cfg.ValkeyURL)
slog.Info("Valkey DB client created")

chLoc := make(chan model.HTTPLocation, 200)
chEvents := make(chan model.BeaconEvent, 500)

@@ -87,8 +84,8 @@ func main() {

r.HandleFunc("/api/beaconids", controller.GetBeaconIds(appState)).Methods("GET")

r.HandleFunc("/api/settings", controller.SettingsListController(appState, client, ctx)).Methods("GET")
r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, client, ctx)).Methods("POST")
r.HandleFunc("/api/settings", controller.SettingsListController(appState, ctx)).Methods("GET")
r.HandleFunc("/api/settings", controller.SettingsEditController(settingsWriter, appState, ctx)).Methods("POST")

wsHandler := http.HandlerFunc(serveWs(appState, ctx))
restApiHandler := handlers.CORS(originsOk, headersOk, methodsOk)(r)
@@ -114,12 +111,12 @@ eventLoop:
case <-ctx.Done():
break eventLoop
case msg := <-chLoc:
if err := service.LocationToBeaconService(msg, appState, client, ctx); err != nil {
if err := service.LocationToBeaconService(msg, appState, ctx); err != nil {
eMsg := fmt.Sprintf("Error in writing location change to beacon: %v\n", err)
slog.Error(eMsg)
}
case msg := <-chEvents:
if err := service.EventToBeaconService(msg, appState, client, ctx); err != nil {
if err := service.EventToBeaconService(msg, appState, ctx); err != nil {
eMsg := fmt.Sprintf("Error in writing event change to beacon: %v\n", err)
slog.Error(eMsg)
}
@@ -140,8 +137,6 @@ eventLoop:
appState.CleanKafkaWriters()

slog.Info("All kafka clients shutdown, starting shutdown of valkey client")
appState.CleanValkeyClient()

slog.Info("API server shutting down")
logFile.Close()
}


+ 0
- 35
internal/pkg/common/appcontext/context.go Wyświetl plik

@@ -1,14 +1,11 @@
package appcontext

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
)

@@ -22,7 +19,6 @@ type AppState struct {
latestList model.LatestBeaconsList
kafkaReadersList model.KafkaReadersList
kafkaWritersList model.KafkaWritersList
valkeyDB *redis.Client
}

// NewAppState creates a new application context AppState with default values
@@ -61,25 +57,6 @@ func NewAppState() *AppState {
}
}

func (m *AppState) AddValkeyClient(url string) *redis.Client {
valkeyDB := redis.NewClient(&redis.Options{
Addr: url,
Password: "",
})

m.valkeyDB = valkeyDB
return valkeyDB
}

func (m *AppState) CleanValkeyClient() {
fmt.Println("shutdown of valkey client starts")
if err := m.valkeyDB.Close(); err != nil {
fmt.Println("Error in shuting down valkey client")
}

fmt.Println("Succesfully shutting down valkey client")
}

func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(kafkaUrl),
@@ -316,15 +293,3 @@ func (m *AppState) UpdateSettings(newSettings model.SettingsVal) {

m.settings.Settings = newSettings
}

func (m *AppState) PersistSettings(client *redis.Client, ctx context.Context) {
d, err := json.Marshal(m.GetSettingsValue())
if err != nil {
fmt.Printf("Error in marshalling settings: %v", err)
return
}

if err := client.Set(ctx, "settings", d, 0).Err(); err != nil {
fmt.Printf("Error in persisting settings: %v", err)
}
}

+ 0
- 5
internal/pkg/config/config.go Wyświetl plik

@@ -9,10 +9,7 @@ type Config struct {
MQTTUser string
MQTTPass string
MQTTClientID string
DBPath string
KafkaURL string
RedisURL string
ValkeyURL string
}

// getEnv returns env var value or a default if not set.
@@ -31,8 +28,6 @@ func Load() *Config {
MQTTUser: getEnv("MQTT_USERNAME", "user"),
MQTTPass: getEnv("MQTT_PASSWORD", "pass"),
MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"),
DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"),
KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"),
ValkeyURL: getEnv("VALKEY_URL", "127.0.0.1:6379"),
}
}

+ 3
- 16
internal/pkg/controller/settings_controller.go Wyświetl plik

@@ -8,11 +8,10 @@ import (

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/redis/go-redis/v9"
"github.com/segmentio/kafka-go"
)

func SettingsEditController(writer *kafka.Writer, appstate *appcontext.AppState, client *redis.Client, ctx context.Context) http.HandlerFunc {
func SettingsEditController(writer *kafka.Writer, appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
var inSettings model.SettingsVal
@@ -47,7 +46,6 @@ func SettingsEditController(writer *kafka.Writer, appstate *appcontext.AppState,

// if all is OK persist settings
appstate.UpdateSettings(inSettings)
appstate.PersistSettings(client, ctx)

w.Write([]byte("ok"))
}
@@ -61,19 +59,8 @@ func settingsCheck(settings model.SettingsVal) bool {
return true
}

func SettingsListController(appstate *appcontext.AppState, client *redis.Client, ctx context.Context) http.HandlerFunc {
func SettingsListController(appstate *appcontext.AppState, ctx context.Context) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
v, err := client.Get(ctx, "settings").Result()
if err == redis.Nil {
msg := "No list found for key settings, starting empty"
fmt.Println(msg)
http.Error(w, "msg", 500)
} else if err != nil {
msg := fmt.Sprintf("Error in connecting to Redis: %v, key: settings returning empty map\n", err)
fmt.Println(msg)
http.Error(w, msg, 500)
} else {
w.Write([]byte(v))
}

}
}

+ 2
- 33
internal/pkg/service/beacon_service.go Wyświetl plik

@@ -2,37 +2,12 @@ package service

import (
"context"
"fmt"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/model"
"github.com/redis/go-redis/v9"
)

type RedisHashable interface {
RedisHashable() (map[string]any, error)
model.BeaconEvent | model.HTTPLocation
}

func persistBeaconValkey[T RedisHashable](id string, msg T, client *redis.Client, ctx context.Context) error {
key := fmt.Sprintf("beacon:%s", id)
hashM, err := msg.RedisHashable()
if err != nil {
fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
return err
}
if err := client.HSet(ctx, key, hashM).Err(); err != nil {
fmt.Println("Error in persisting set in Redis key: ", key)
return err
}
if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
return err
}
return nil
}

func LocationToBeaconService(msg model.HTTPLocation, appState *appcontext.AppState, client *redis.Client, ctx context.Context) error {
func LocationToBeaconService(msg model.HTTPLocation, appState *appcontext.AppState, ctx context.Context) error {
id := msg.ID
beacon, ok := appState.GetHTTPResult(id)
if !ok {
@@ -45,14 +20,11 @@ func LocationToBeaconService(msg model.HTTPLocation, appState *appcontext.AppSta
beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation
appState.UpdateHTTPResult(id, beacon)
}
if err := persistBeaconValkey(id, msg, client, ctx); err != nil {
return err
}

return nil
}

func EventToBeaconService(msg model.BeaconEvent, appState *appcontext.AppState, client *redis.Client, ctx context.Context) error {
func EventToBeaconService(msg model.BeaconEvent, appState *appcontext.AppState, ctx context.Context) error {
id := msg.ID
beacon, ok := appState.GetHTTPResult(id)
if !ok {
@@ -64,9 +36,6 @@ func EventToBeaconService(msg model.BeaconEvent, appState *appcontext.AppState,
beacon.Event = msg.Event
appState.UpdateHTTPResult(id, beacon)
}
if err := persistBeaconValkey(id, msg, client, ctx); err != nil {
return err
}

return nil
}

Ładowanie…
Anuluj
Zapisz