From 28d70bea24149c694156ed04c2795e652662517b Mon Sep 17 00:00:00 2001 From: blazSmehov Date: Mon, 1 Dec 2025 14:10:26 +0100 Subject: [PATCH] feat: gracefully close kafka connections on docker sigterm signal --- cmd/bridge/main.go | 4 +- cmd/decoder/main.go | 35 +++-- cmd/location/main.go | 34 +++-- cmd/server/main.go | 159 +++++++++++----------- internal/pkg/common/appcontext/context.go | 79 ++++++++++- internal/pkg/kafkaclient/consumer.go | 32 +++-- internal/pkg/kafkaclient/reader.go | 3 + internal/pkg/kafkaclient/writer.go | 3 + internal/pkg/model/types.go | 12 ++ 9 files changed, 244 insertions(+), 117 deletions(-) diff --git a/cmd/bridge/main.go b/cmd/bridge/main.go index 0bcfb22..e821e40 100644 --- a/cmd/bridge/main.go +++ b/cmd/bridge/main.go @@ -29,11 +29,11 @@ func main() { }) if err != nil { - fmt.Println("Could not connect to MQTT broker") + fmt.Printf("Could not connect to MQTT broker, addr: %s", cfg.MQTTHost) panic(err) } - fmt.Println("Successfuly connected to MQTT broker") + fmt.Printf("Successfuly connected to MQTT broker, addr: %s", cfg.MQTTHost) writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "rawbeacons") defer writer.Close() diff --git a/cmd/decoder/main.go b/cmd/decoder/main.go index faf515f..b5d805e 100644 --- a/cmd/decoder/main.go +++ b/cmd/decoder/main.go @@ -5,7 +5,10 @@ import ( "context" "encoding/hex" "fmt" + "os/signal" "strings" + "sync" + "syscall" "github.com/AFASystems/presence/internal/pkg/common/appcontext" "github.com/AFASystems/presence/internal/pkg/common/utils" @@ -15,31 +18,36 @@ import ( "github.com/segmentio/kafka-go" ) +var wg sync.WaitGroup + func main() { // Load global context to init beacons and latest list appState := appcontext.NewAppState() cfg := config.Load() - // Kafka reader for Raw MQTT beacons - rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw") - defer rawReader.Close() + // define context + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) + defer stop() - // Kafka reader for API server updates - apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api") - defer apiReader.Close() + rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw") + apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api") - alertWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "alertbeacons") - defer alertWriter.Close() + alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons") fmt.Println("Decoder initialized, subscribed to Kafka topics") + chRaw := make(chan model.BeaconAdvertisement, 2000) chApi := make(chan model.ApiUpdate, 2000) - go kafkaclient.Consume(rawReader, chRaw) - go kafkaclient.Consume(apiReader, chApi) + wg.Add(2) + go kafkaclient.Consume(rawReader, chRaw, ctx, &wg) + go kafkaclient.Consume(apiReader, chApi, ctx, &wg) +eventloop: for { select { + case <-ctx.Done(): + break eventloop case msg := <-chRaw: processIncoming(msg, appState, alertWriter) case msg := <-chApi: @@ -52,6 +60,13 @@ func main() { } } } + + fmt.Println("broken out of the main event loop") + wg.Wait() + + fmt.Println("All go routines have stopped, Beggining to close Kafka connections") + appState.CleanKafkaReaders() + appState.CleanKafkaWriters() } func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer) { diff --git a/cmd/location/main.go b/cmd/location/main.go index 03f5155..69669d5 100644 --- a/cmd/location/main.go +++ b/cmd/location/main.go @@ -4,6 +4,9 @@ import ( "context" "encoding/json" "fmt" + "os/signal" + "sync" + "syscall" "time" "github.com/AFASystems/presence/internal/pkg/common/appcontext" @@ -14,21 +17,21 @@ import ( "github.com/segmentio/kafka-go" ) +var wg sync.WaitGroup + func main() { // Load global context to init beacons and latest list appState := appcontext.NewAppState() cfg := config.Load() - // Kafka reader for Raw MQTT beacons - rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc") - defer rawReader.Close() + // Define context + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) + defer stop() - // Kafka reader for API server updates - apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc") - defer apiReader.Close() + rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc") + apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc") - writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "locevents") - defer writer.Close() + writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents") fmt.Println("Locations algorithm initialized, subscribed to Kafka topics") @@ -38,11 +41,15 @@ func main() { chRaw := make(chan model.BeaconAdvertisement, 2000) chApi := make(chan model.ApiUpdate, 2000) - go kafkaclient.Consume(rawReader, chRaw) - go kafkaclient.Consume(apiReader, chApi) + wg.Add(2) + go kafkaclient.Consume(rawReader, chRaw, ctx, &wg) + go kafkaclient.Consume(apiReader, chApi, ctx, &wg) +eventLoop: for { select { + case <-ctx.Done(): + break eventLoop case <-locTicker.C: getLikelyLocations(appState, writer) case msg := <-chRaw: @@ -58,6 +65,13 @@ func main() { } } } + + fmt.Println("broken out of the main event loop") + wg.Wait() + + fmt.Println("All go routines have stopped, Beggining to close Kafka connections") + appState.CleanKafkaReaders() + appState.CleanKafkaWriters() } func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { diff --git a/cmd/server/main.go b/cmd/server/main.go index 53bd0fc..49208f3 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -6,7 +6,10 @@ import ( "fmt" "log" "net/http" + "os/signal" "strings" + "sync" + "syscall" "time" "github.com/AFASystems/presence/internal/pkg/common/appcontext" @@ -24,102 +27,38 @@ var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } -func main() { - HttpServer("0.0.0.0:1902") -} +var wg sync.WaitGroup -func HttpServer(addr string) { +func main() { cfg := config.Load() appState := appcontext.NewAppState() + // define context + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) + defer stop() + headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) originsOk := handlers.AllowedOrigins([]string{"*"}) methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) - // Kafka writer that relays messages - writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "apibeacons") - defer writer.Close() - - settingsWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "settings") - defer settingsWriter.Close() + writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") + settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") - // Kafka reader for Raw MQTT beacons - locationReader := kafkaclient.KafkaReader(cfg.KafkaURL, "locevents", "gid-loc-serv") - defer locationReader.Close() - - // Kafka reader for API server updates - alertsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") - defer alertsReader.Close() + locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") + alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") client := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", Password: "", }) - ctx := context.Background() - // Separate channel for location change? chLoc := make(chan model.HTTPLocation, 200) chEvents := make(chan model.BeaconEvent, 500) - go kafkaclient.Consume(locationReader, chLoc) - go kafkaclient.Consume(alertsReader, chEvents) - - go func() { - for { - select { - case msg := <-chLoc: - beacon, ok := appState.GetBeacon(msg.ID) - if !ok { - appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation}) - } else { - beacon.ID = msg.ID - beacon.Location = msg.Location - beacon.Distance = msg.Distance - beacon.LastSeen = msg.LastSeen - beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation - appState.UpdateBeacon(msg.ID, beacon) - } - key := fmt.Sprintf("beacon:%s", msg.ID) - hashM, err := msg.RedisHashable() - if err != nil { - fmt.Println("Error in converting location into hashmap for Redis insert: ", err) - continue - } - if err := client.HSet(ctx, key, hashM).Err(); err != nil { - fmt.Println("Error in persisting set in Redis key: ", key) - continue - } - if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { - fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) - } - case msg := <-chEvents: - beacon, ok := appState.GetBeacon(msg.ID) - if !ok { - appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event}) - } else { - beacon.ID = msg.ID - beacon.BeaconType = msg.Type - beacon.HSBattery = int64(msg.Battery) - beacon.Event = msg.Event - appState.UpdateBeacon(msg.ID, beacon) - } - key := fmt.Sprintf("beacon:%s", msg.ID) - hashM, err := msg.RedisHashable() - if err != nil { - fmt.Println("Error in converting location into hashmap for Redis insert: ", err) - continue - } - if err := client.HSet(ctx, key, hashM).Err(); err != nil { - fmt.Println("Error in persisting set in Redis key: ", key) - continue - } - if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { - fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) - } - } - } - }() + wg.Add(2) + go kafkaclient.Consume(locationReader, chLoc, ctx, &wg) + go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg) r := mux.NewRouter() @@ -146,7 +85,71 @@ func HttpServer(addr string) { // r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client)) r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast)) - http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) + http.ListenAndServe("0.0.0.0:1902", handlers.CORS(originsOk, headersOk, methodsOk)(r)) + +eventLoop: + for { + select { + case <-ctx.Done(): + break eventLoop + case msg := <-chLoc: + beacon, ok := appState.GetBeacon(msg.ID) + if !ok { + appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation}) + } else { + beacon.ID = msg.ID + beacon.Location = msg.Location + beacon.Distance = msg.Distance + beacon.LastSeen = msg.LastSeen + beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation + appState.UpdateBeacon(msg.ID, beacon) + } + key := fmt.Sprintf("beacon:%s", msg.ID) + hashM, err := msg.RedisHashable() + if err != nil { + fmt.Println("Error in converting location into hashmap for Redis insert: ", err) + continue + } + if err := client.HSet(ctx, key, hashM).Err(); err != nil { + fmt.Println("Error in persisting set in Redis key: ", key) + continue + } + if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { + fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) + } + case msg := <-chEvents: + beacon, ok := appState.GetBeacon(msg.ID) + if !ok { + appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event}) + } else { + beacon.ID = msg.ID + beacon.BeaconType = msg.Type + beacon.HSBattery = int64(msg.Battery) + beacon.Event = msg.Event + appState.UpdateBeacon(msg.ID, beacon) + } + key := fmt.Sprintf("beacon:%s", msg.ID) + hashM, err := msg.RedisHashable() + if err != nil { + fmt.Println("Error in converting location into hashmap for Redis insert: ", err) + continue + } + if err := client.HSet(ctx, key, hashM).Err(); err != nil { + fmt.Println("Error in persisting set in Redis key: ", key) + continue + } + if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { + fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) + } + } + } + + fmt.Println("broken out of the main event loop") + wg.Wait() + + fmt.Println("All go routines have stopped, Beggining to close Kafka connections") + appState.CleanKafkaReaders() + appState.CleanKafkaWriters() } func beaconsListSingleHandler(appstate *appcontext.AppState) http.HandlerFunc { diff --git a/internal/pkg/common/appcontext/context.go b/internal/pkg/common/appcontext/context.go index dc8c157..7a259af 100644 --- a/internal/pkg/common/appcontext/context.go +++ b/internal/pkg/common/appcontext/context.go @@ -1,16 +1,23 @@ package appcontext import ( + "fmt" + "strings" + "time" + "github.com/AFASystems/presence/internal/pkg/model" + "github.com/segmentio/kafka-go" ) // AppState provides centralized access to application state type AppState struct { - beacons model.BeaconsList - settings model.Settings - beaconEvents model.BeaconEventList - beaconsLookup map[string]struct{} - latestList model.LatestBeaconsList + beacons model.BeaconsList + settings model.Settings + beaconEvents model.BeaconEventList + beaconsLookup map[string]struct{} + latestList model.LatestBeaconsList + kafkaReadersList model.KafkaReadersList + kafkaWritersList model.KafkaWritersList } // NewAppState creates a new application context AppState with default values @@ -37,7 +44,69 @@ func NewAppState() *AppState { latestList: model.LatestBeaconsList{ LatestList: make(map[string]model.Beacon), }, + kafkaReadersList: model.KafkaReadersList{ + KafkaReaders: make([]*kafka.Reader, 0), + }, + kafkaWritersList: model.KafkaWritersList{ + KafkaWriters: make([]*kafka.Writer, 0), + }, + } +} + +func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer { + kafkaWriter := &kafka.Writer{ + Addr: kafka.TCP(kafkaUrl), + Topic: topic, + Balancer: &kafka.LeastBytes{}, + Async: false, + RequiredAcks: kafka.RequireAll, + BatchSize: 100, + BatchTimeout: 10 * time.Millisecond, + } + + m.kafkaWritersList.KafkaWritersLock.Lock() + m.kafkaWritersList.KafkaWriters = append(m.kafkaWritersList.KafkaWriters, kafkaWriter) + m.kafkaWritersList.KafkaWritersLock.Unlock() + + return kafkaWriter +} + +func (m *AppState) CleanKafkaWriters() { + fmt.Println("clean of kafka readers starts") + for _, r := range m.kafkaWritersList.KafkaWriters { + if err := r.Close(); err != nil { + fmt.Printf("Error in closing kafka writer %v", err) + } + } + + fmt.Println("Kafka writers graceful cleanup complete") +} + +func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader { + brokers := strings.Split(kafkaUrl, ",") + kafkaReader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: brokers, + GroupID: groupID, + Topic: topic, + MinBytes: 1, + MaxBytes: 10e6, + }) + + m.kafkaReadersList.KafkaReadersLock.Lock() + m.kafkaReadersList.KafkaReaders = append(m.kafkaReadersList.KafkaReaders, kafkaReader) + m.kafkaReadersList.KafkaReadersLock.Unlock() + + return kafkaReader +} + +func (m *AppState) CleanKafkaReaders() { + for _, r := range m.kafkaReadersList.KafkaReaders { + if err := r.Close(); err != nil { + fmt.Printf("Error in closing kafka reader %v", err) + } } + + fmt.Println("Kafka readers graceful cleanup complete") } // GetBeacons returns thread-safe access to beacons list diff --git a/internal/pkg/kafkaclient/consumer.go b/internal/pkg/kafkaclient/consumer.go index 9e6b8c5..a5dda6b 100644 --- a/internal/pkg/kafkaclient/consumer.go +++ b/internal/pkg/kafkaclient/consumer.go @@ -4,24 +4,32 @@ import ( "context" "encoding/json" "fmt" + "sync" "github.com/segmentio/kafka-go" ) -func Consume[T any](r *kafka.Reader, ch chan<- T) { +func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() for { - msg, err := r.ReadMessage(context.Background()) - if err != nil { - fmt.Println("error reading message:", err) - continue - } + select { + case <-ctx.Done(): + fmt.Println("consumer closed") + return + default: + msg, err := r.ReadMessage(ctx) + if err != nil { + fmt.Println("error reading message:", err) + continue + } - var data T - if err := json.Unmarshal(msg.Value, &data); err != nil { - fmt.Println("error decoding:", err) - continue - } + var data T + if err := json.Unmarshal(msg.Value, &data); err != nil { + fmt.Println("error decoding:", err) + continue + } - ch <- data + ch <- data + } } } diff --git a/internal/pkg/kafkaclient/reader.go b/internal/pkg/kafkaclient/reader.go index dbd5e07..d836f22 100644 --- a/internal/pkg/kafkaclient/reader.go +++ b/internal/pkg/kafkaclient/reader.go @@ -6,6 +6,9 @@ import ( "github.com/segmentio/kafka-go" ) +// Create Kafka reader +// +// Deprecated: Use context manager object instead func KafkaReader(kafkaURL, topic, groupID string) *kafka.Reader { brokers := strings.Split(kafkaURL, ",") return kafka.NewReader(kafka.ReaderConfig{ diff --git a/internal/pkg/kafkaclient/writer.go b/internal/pkg/kafkaclient/writer.go index bffa957..adee4fa 100644 --- a/internal/pkg/kafkaclient/writer.go +++ b/internal/pkg/kafkaclient/writer.go @@ -6,6 +6,9 @@ import ( "github.com/segmentio/kafka-go" ) +// Create Kafka writer +// +// Deprecated: Use context manager object instead func KafkaWriter(kafkaURL, topic string) *kafka.Writer { return &kafka.Writer{ Addr: kafka.TCP(kafkaURL), diff --git a/internal/pkg/model/types.go b/internal/pkg/model/types.go index 31b4ab6..a2edb37 100644 --- a/internal/pkg/model/types.go +++ b/internal/pkg/model/types.go @@ -2,6 +2,8 @@ package model import ( "sync" + + "github.com/segmentio/kafka-go" ) // Settings defines configuration parameters for presence detection behavior. @@ -198,5 +200,15 @@ type ApiUpdate struct { ID string } +type KafkaReadersList struct { + KafkaReadersLock sync.RWMutex + KafkaReaders []*kafka.Reader +} + +type KafkaWritersList struct { + KafkaWritersLock sync.RWMutex + KafkaWriters []*kafka.Writer +} + var HTTPHostPathPtr *string var HTTPWSHostPathPtr *string