Selaa lähdekoodia

feat: gracefully close kafka connections on docker sigterm signal

master
Blaz Smehov 4 päivää sitten
vanhempi
commit
28d70bea24
9 muutettua tiedostoa jossa 244 lisäystä ja 117 poistoa
  1. +2
    -2
      cmd/bridge/main.go
  2. +25
    -10
      cmd/decoder/main.go
  3. +24
    -10
      cmd/location/main.go
  4. +81
    -78
      cmd/server/main.go
  5. +74
    -5
      internal/pkg/common/appcontext/context.go
  6. +20
    -12
      internal/pkg/kafkaclient/consumer.go
  7. +3
    -0
      internal/pkg/kafkaclient/reader.go
  8. +3
    -0
      internal/pkg/kafkaclient/writer.go
  9. +12
    -0
      internal/pkg/model/types.go

+ 2
- 2
cmd/bridge/main.go Näytä tiedosto

@@ -29,11 +29,11 @@ func main() {
})

if err != nil {
fmt.Println("Could not connect to MQTT broker")
fmt.Printf("Could not connect to MQTT broker, addr: %s", cfg.MQTTHost)
panic(err)
}

fmt.Println("Successfuly connected to MQTT broker")
fmt.Printf("Successfuly connected to MQTT broker, addr: %s", cfg.MQTTHost)

writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "rawbeacons")
defer writer.Close()


+ 25
- 10
cmd/decoder/main.go Näytä tiedosto

@@ -5,7 +5,10 @@ import (
"context"
"encoding/hex"
"fmt"
"os/signal"
"strings"
"sync"
"syscall"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
"github.com/AFASystems/presence/internal/pkg/common/utils"
@@ -15,31 +18,36 @@ import (
"github.com/segmentio/kafka-go"
)

var wg sync.WaitGroup

func main() {
// Load global context to init beacons and latest list
appState := appcontext.NewAppState()
cfg := config.Load()

// Kafka reader for Raw MQTT beacons
rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
defer rawReader.Close()
// define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

// Kafka reader for API server updates
apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api")
defer apiReader.Close()
rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw")
apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api")

alertWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "alertbeacons")
defer alertWriter.Close()
alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons")

fmt.Println("Decoder initialized, subscribed to Kafka topics")

chRaw := make(chan model.BeaconAdvertisement, 2000)
chApi := make(chan model.ApiUpdate, 2000)

go kafkaclient.Consume(rawReader, chRaw)
go kafkaclient.Consume(apiReader, chApi)
wg.Add(2)
go kafkaclient.Consume(rawReader, chRaw, ctx, &wg)
go kafkaclient.Consume(apiReader, chApi, ctx, &wg)

eventloop:
for {
select {
case <-ctx.Done():
break eventloop
case msg := <-chRaw:
processIncoming(msg, appState, alertWriter)
case msg := <-chApi:
@@ -52,6 +60,13 @@ func main() {
}
}
}

fmt.Println("broken out of the main event loop")
wg.Wait()

fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
}

func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer) {


+ 24
- 10
cmd/location/main.go Näytä tiedosto

@@ -4,6 +4,9 @@ import (
"context"
"encoding/json"
"fmt"
"os/signal"
"sync"
"syscall"
"time"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
@@ -14,21 +17,21 @@ import (
"github.com/segmentio/kafka-go"
)

var wg sync.WaitGroup

func main() {
// Load global context to init beacons and latest list
appState := appcontext.NewAppState()
cfg := config.Load()

// Kafka reader for Raw MQTT beacons
rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc")
defer rawReader.Close()
// Define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

// Kafka reader for API server updates
apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc")
defer apiReader.Close()
rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc")
apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc")

writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "locevents")
defer writer.Close()
writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents")

fmt.Println("Locations algorithm initialized, subscribed to Kafka topics")

@@ -38,11 +41,15 @@ func main() {
chRaw := make(chan model.BeaconAdvertisement, 2000)
chApi := make(chan model.ApiUpdate, 2000)

go kafkaclient.Consume(rawReader, chRaw)
go kafkaclient.Consume(apiReader, chApi)
wg.Add(2)
go kafkaclient.Consume(rawReader, chRaw, ctx, &wg)
go kafkaclient.Consume(apiReader, chApi, ctx, &wg)

eventLoop:
for {
select {
case <-ctx.Done():
break eventLoop
case <-locTicker.C:
getLikelyLocations(appState, writer)
case msg := <-chRaw:
@@ -58,6 +65,13 @@ func main() {
}
}
}

fmt.Println("broken out of the main event loop")
wg.Wait()

fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
}

func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) {


+ 81
- 78
cmd/server/main.go Näytä tiedosto

@@ -6,7 +6,10 @@ import (
"fmt"
"log"
"net/http"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/AFASystems/presence/internal/pkg/common/appcontext"
@@ -24,102 +27,38 @@ var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}

func main() {
HttpServer("0.0.0.0:1902")
}
var wg sync.WaitGroup

func HttpServer(addr string) {
func main() {
cfg := config.Load()
appState := appcontext.NewAppState()

// define context
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()

headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"})
originsOk := handlers.AllowedOrigins([]string{"*"})
methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"})

// Kafka writer that relays messages
writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "apibeacons")
defer writer.Close()

settingsWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "settings")
defer settingsWriter.Close()
writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons")
settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings")

// Kafka reader for Raw MQTT beacons
locationReader := kafkaclient.KafkaReader(cfg.KafkaURL, "locevents", "gid-loc-serv")
defer locationReader.Close()

// Kafka reader for API server updates
alertsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")
defer alertsReader.Close()
locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server")
alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv")

client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
})

ctx := context.Background()

// Separate channel for location change?
chLoc := make(chan model.HTTPLocation, 200)
chEvents := make(chan model.BeaconEvent, 500)

go kafkaclient.Consume(locationReader, chLoc)
go kafkaclient.Consume(alertsReader, chEvents)

go func() {
for {
select {
case msg := <-chLoc:
beacon, ok := appState.GetBeacon(msg.ID)
if !ok {
appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation})
} else {
beacon.ID = msg.ID
beacon.Location = msg.Location
beacon.Distance = msg.Distance
beacon.LastSeen = msg.LastSeen
beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation
appState.UpdateBeacon(msg.ID, beacon)
}
key := fmt.Sprintf("beacon:%s", msg.ID)
hashM, err := msg.RedisHashable()
if err != nil {
fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
continue
}
if err := client.HSet(ctx, key, hashM).Err(); err != nil {
fmt.Println("Error in persisting set in Redis key: ", key)
continue
}
if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
}
case msg := <-chEvents:
beacon, ok := appState.GetBeacon(msg.ID)
if !ok {
appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event})
} else {
beacon.ID = msg.ID
beacon.BeaconType = msg.Type
beacon.HSBattery = int64(msg.Battery)
beacon.Event = msg.Event
appState.UpdateBeacon(msg.ID, beacon)
}
key := fmt.Sprintf("beacon:%s", msg.ID)
hashM, err := msg.RedisHashable()
if err != nil {
fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
continue
}
if err := client.HSet(ctx, key, hashM).Err(); err != nil {
fmt.Println("Error in persisting set in Redis key: ", key)
continue
}
if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
}
}
}
}()
wg.Add(2)
go kafkaclient.Consume(locationReader, chLoc, ctx, &wg)
go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg)

r := mux.NewRouter()

@@ -146,7 +85,71 @@ func HttpServer(addr string) {
// r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client))
r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast))

http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r))
http.ListenAndServe("0.0.0.0:1902", handlers.CORS(originsOk, headersOk, methodsOk)(r))

eventLoop:
for {
select {
case <-ctx.Done():
break eventLoop
case msg := <-chLoc:
beacon, ok := appState.GetBeacon(msg.ID)
if !ok {
appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation})
} else {
beacon.ID = msg.ID
beacon.Location = msg.Location
beacon.Distance = msg.Distance
beacon.LastSeen = msg.LastSeen
beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation
appState.UpdateBeacon(msg.ID, beacon)
}
key := fmt.Sprintf("beacon:%s", msg.ID)
hashM, err := msg.RedisHashable()
if err != nil {
fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
continue
}
if err := client.HSet(ctx, key, hashM).Err(); err != nil {
fmt.Println("Error in persisting set in Redis key: ", key)
continue
}
if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
}
case msg := <-chEvents:
beacon, ok := appState.GetBeacon(msg.ID)
if !ok {
appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event})
} else {
beacon.ID = msg.ID
beacon.BeaconType = msg.Type
beacon.HSBattery = int64(msg.Battery)
beacon.Event = msg.Event
appState.UpdateBeacon(msg.ID, beacon)
}
key := fmt.Sprintf("beacon:%s", msg.ID)
hashM, err := msg.RedisHashable()
if err != nil {
fmt.Println("Error in converting location into hashmap for Redis insert: ", err)
continue
}
if err := client.HSet(ctx, key, hashM).Err(); err != nil {
fmt.Println("Error in persisting set in Redis key: ", key)
continue
}
if err := client.SAdd(ctx, "beacons", key).Err(); err != nil {
fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err)
}
}
}

fmt.Println("broken out of the main event loop")
wg.Wait()

fmt.Println("All go routines have stopped, Beggining to close Kafka connections")
appState.CleanKafkaReaders()
appState.CleanKafkaWriters()
}

func beaconsListSingleHandler(appstate *appcontext.AppState) http.HandlerFunc {


+ 74
- 5
internal/pkg/common/appcontext/context.go Näytä tiedosto

@@ -1,16 +1,23 @@
package appcontext

import (
"fmt"
"strings"
"time"

"github.com/AFASystems/presence/internal/pkg/model"
"github.com/segmentio/kafka-go"
)

// AppState provides centralized access to application state
type AppState struct {
beacons model.BeaconsList
settings model.Settings
beaconEvents model.BeaconEventList
beaconsLookup map[string]struct{}
latestList model.LatestBeaconsList
beacons model.BeaconsList
settings model.Settings
beaconEvents model.BeaconEventList
beaconsLookup map[string]struct{}
latestList model.LatestBeaconsList
kafkaReadersList model.KafkaReadersList
kafkaWritersList model.KafkaWritersList
}

// NewAppState creates a new application context AppState with default values
@@ -37,7 +44,69 @@ func NewAppState() *AppState {
latestList: model.LatestBeaconsList{
LatestList: make(map[string]model.Beacon),
},
kafkaReadersList: model.KafkaReadersList{
KafkaReaders: make([]*kafka.Reader, 0),
},
kafkaWritersList: model.KafkaWritersList{
KafkaWriters: make([]*kafka.Writer, 0),
},
}
}

func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer {
kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(kafkaUrl),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Async: false,
RequiredAcks: kafka.RequireAll,
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
}

m.kafkaWritersList.KafkaWritersLock.Lock()
m.kafkaWritersList.KafkaWriters = append(m.kafkaWritersList.KafkaWriters, kafkaWriter)
m.kafkaWritersList.KafkaWritersLock.Unlock()

return kafkaWriter
}

func (m *AppState) CleanKafkaWriters() {
fmt.Println("clean of kafka readers starts")
for _, r := range m.kafkaWritersList.KafkaWriters {
if err := r.Close(); err != nil {
fmt.Printf("Error in closing kafka writer %v", err)
}
}

fmt.Println("Kafka writers graceful cleanup complete")
}

func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaUrl, ",")
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 1,
MaxBytes: 10e6,
})

m.kafkaReadersList.KafkaReadersLock.Lock()
m.kafkaReadersList.KafkaReaders = append(m.kafkaReadersList.KafkaReaders, kafkaReader)
m.kafkaReadersList.KafkaReadersLock.Unlock()

return kafkaReader
}

func (m *AppState) CleanKafkaReaders() {
for _, r := range m.kafkaReadersList.KafkaReaders {
if err := r.Close(); err != nil {
fmt.Printf("Error in closing kafka reader %v", err)
}
}

fmt.Println("Kafka readers graceful cleanup complete")
}

// GetBeacons returns thread-safe access to beacons list


+ 20
- 12
internal/pkg/kafkaclient/consumer.go Näytä tiedosto

@@ -4,24 +4,32 @@ import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/segmentio/kafka-go"
)

func Consume[T any](r *kafka.Reader, ch chan<- T) {
func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
for {
msg, err := r.ReadMessage(context.Background())
if err != nil {
fmt.Println("error reading message:", err)
continue
}
select {
case <-ctx.Done():
fmt.Println("consumer closed")
return
default:
msg, err := r.ReadMessage(ctx)
if err != nil {
fmt.Println("error reading message:", err)
continue
}

var data T
if err := json.Unmarshal(msg.Value, &data); err != nil {
fmt.Println("error decoding:", err)
continue
}
var data T
if err := json.Unmarshal(msg.Value, &data); err != nil {
fmt.Println("error decoding:", err)
continue
}

ch <- data
ch <- data
}
}
}

+ 3
- 0
internal/pkg/kafkaclient/reader.go Näytä tiedosto

@@ -6,6 +6,9 @@ import (
"github.com/segmentio/kafka-go"
)

// Create Kafka reader
//
// Deprecated: Use context manager object instead
func KafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{


+ 3
- 0
internal/pkg/kafkaclient/writer.go Näytä tiedosto

@@ -6,6 +6,9 @@ import (
"github.com/segmentio/kafka-go"
)

// Create Kafka writer
//
// Deprecated: Use context manager object instead
func KafkaWriter(kafkaURL, topic string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(kafkaURL),


+ 12
- 0
internal/pkg/model/types.go Näytä tiedosto

@@ -2,6 +2,8 @@ package model

import (
"sync"

"github.com/segmentio/kafka-go"
)

// Settings defines configuration parameters for presence detection behavior.
@@ -198,5 +200,15 @@ type ApiUpdate struct {
ID string
}

type KafkaReadersList struct {
KafkaReadersLock sync.RWMutex
KafkaReaders []*kafka.Reader
}

type KafkaWritersList struct {
KafkaWritersLock sync.RWMutex
KafkaWriters []*kafka.Writer
}

var HTTPHostPathPtr *string
var HTTPWSHostPathPtr *string

Ladataan…
Peruuta
Tallenna