diff --git a/cmd/server/main.go b/cmd/server/main.go index 49208f3..3105fc5 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -47,10 +47,7 @@ func main() { locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") - client := redis.NewClient(&redis.Options{ - Addr: "127.0.0.1:6379", - Password: "", - }) + client := appState.AddValkeyClient(cfg.ValkeyURL) // Separate channel for location change? chLoc := make(chan model.HTTPLocation, 200) @@ -150,6 +147,8 @@ eventLoop: fmt.Println("All go routines have stopped, Beggining to close Kafka connections") appState.CleanKafkaReaders() appState.CleanKafkaWriters() + fmt.Println("All kafka clients shutdown, starting shutdown of valkey client") + appState.CleanValkeyClient() } func beaconsListSingleHandler(appstate *appcontext.AppState) http.HandlerFunc { diff --git a/internal/pkg/common/appcontext/context.go b/internal/pkg/common/appcontext/context.go index 7a259af..f626c28 100644 --- a/internal/pkg/common/appcontext/context.go +++ b/internal/pkg/common/appcontext/context.go @@ -6,6 +6,7 @@ import ( "time" "github.com/AFASystems/presence/internal/pkg/model" + "github.com/redis/go-redis/v9" "github.com/segmentio/kafka-go" ) @@ -18,6 +19,7 @@ type AppState struct { latestList model.LatestBeaconsList kafkaReadersList model.KafkaReadersList kafkaWritersList model.KafkaWritersList + valkeyDB *redis.Client } // NewAppState creates a new application context AppState with default values @@ -53,6 +55,25 @@ func NewAppState() *AppState { } } +func (m *AppState) AddValkeyClient(url string) *redis.Client { + valkeyDB := redis.NewClient(&redis.Options{ + Addr: url, + Password: "", + }) + + m.valkeyDB = valkeyDB + return valkeyDB +} + +func (m *AppState) CleanValkeyClient() { + fmt.Println("shutdown of valkey client starts") + if err := m.valkeyDB.Close(); err != nil { + fmt.Println("Error in shuting down valkey client") + } + + fmt.Println("Succesfully shutting down valkey client") +} + func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer { kafkaWriter := &kafka.Writer{ Addr: kafka.TCP(kafkaUrl), @@ -72,14 +93,14 @@ func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer { } func (m *AppState) CleanKafkaWriters() { - fmt.Println("clean of kafka readers starts") + fmt.Println("shutdown of kafka readers starts") for _, r := range m.kafkaWritersList.KafkaWriters { if err := r.Close(); err != nil { fmt.Printf("Error in closing kafka writer %v", err) } } - fmt.Println("Kafka writers graceful cleanup complete") + fmt.Println("Kafka writers graceful shutdown complete") } func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader { @@ -106,7 +127,7 @@ func (m *AppState) CleanKafkaReaders() { } } - fmt.Println("Kafka readers graceful cleanup complete") + fmt.Println("Kafka readers graceful shutdown complete") } // GetBeacons returns thread-safe access to beacons list diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 62bd5f8..f7c5b96 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -12,6 +12,7 @@ type Config struct { DBPath string KafkaURL string RedisURL string + ValkeyURL string } // getEnv returns env var value or a default if not set. @@ -32,5 +33,6 @@ func Load() *Config { MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"), DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"), KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"), + ValkeyURL: getEnv("VALKEY_URL", "127.0.0.1:6379"), } }