| Автор | SHA1 | Повідомлення | Дата |
|---|---|---|---|
|
|
e14d8e0d97 | graceful shutdown of redis client | 4 дні тому |
|
|
28d70bea24 | feat: gracefully close kafka connections on docker sigterm signal | 4 дні тому |
| @@ -29,11 +29,11 @@ func main() { | |||
| }) | |||
| if err != nil { | |||
| fmt.Println("Could not connect to MQTT broker") | |||
| fmt.Printf("Could not connect to MQTT broker, addr: %s", cfg.MQTTHost) | |||
| panic(err) | |||
| } | |||
| fmt.Println("Successfuly connected to MQTT broker") | |||
| fmt.Printf("Successfuly connected to MQTT broker, addr: %s", cfg.MQTTHost) | |||
| writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "rawbeacons") | |||
| defer writer.Close() | |||
| @@ -5,7 +5,10 @@ import ( | |||
| "context" | |||
| "encoding/hex" | |||
| "fmt" | |||
| "os/signal" | |||
| "strings" | |||
| "sync" | |||
| "syscall" | |||
| "github.com/AFASystems/presence/internal/pkg/common/appcontext" | |||
| "github.com/AFASystems/presence/internal/pkg/common/utils" | |||
| @@ -15,31 +18,36 @@ import ( | |||
| "github.com/segmentio/kafka-go" | |||
| ) | |||
| var wg sync.WaitGroup | |||
| func main() { | |||
| // Load global context to init beacons and latest list | |||
| appState := appcontext.NewAppState() | |||
| cfg := config.Load() | |||
| // Kafka reader for Raw MQTT beacons | |||
| rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw") | |||
| defer rawReader.Close() | |||
| // define context | |||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | |||
| defer stop() | |||
| // Kafka reader for API server updates | |||
| apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api") | |||
| defer apiReader.Close() | |||
| rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw") | |||
| apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api") | |||
| alertWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "alertbeacons") | |||
| defer alertWriter.Close() | |||
| alertWriter := appState.AddKafkaWriter(cfg.KafkaURL, "alertbeacons") | |||
| fmt.Println("Decoder initialized, subscribed to Kafka topics") | |||
| chRaw := make(chan model.BeaconAdvertisement, 2000) | |||
| chApi := make(chan model.ApiUpdate, 2000) | |||
| go kafkaclient.Consume(rawReader, chRaw) | |||
| go kafkaclient.Consume(apiReader, chApi) | |||
| wg.Add(2) | |||
| go kafkaclient.Consume(rawReader, chRaw, ctx, &wg) | |||
| go kafkaclient.Consume(apiReader, chApi, ctx, &wg) | |||
| eventloop: | |||
| for { | |||
| select { | |||
| case <-ctx.Done(): | |||
| break eventloop | |||
| case msg := <-chRaw: | |||
| processIncoming(msg, appState, alertWriter) | |||
| case msg := <-chApi: | |||
| @@ -52,6 +60,13 @@ func main() { | |||
| } | |||
| } | |||
| } | |||
| fmt.Println("broken out of the main event loop") | |||
| wg.Wait() | |||
| fmt.Println("All go routines have stopped, Beggining to close Kafka connections") | |||
| appState.CleanKafkaReaders() | |||
| appState.CleanKafkaWriters() | |||
| } | |||
| func processIncoming(adv model.BeaconAdvertisement, appState *appcontext.AppState, writer *kafka.Writer) { | |||
| @@ -4,6 +4,9 @@ import ( | |||
| "context" | |||
| "encoding/json" | |||
| "fmt" | |||
| "os/signal" | |||
| "sync" | |||
| "syscall" | |||
| "time" | |||
| "github.com/AFASystems/presence/internal/pkg/common/appcontext" | |||
| @@ -14,21 +17,21 @@ import ( | |||
| "github.com/segmentio/kafka-go" | |||
| ) | |||
| var wg sync.WaitGroup | |||
| func main() { | |||
| // Load global context to init beacons and latest list | |||
| appState := appcontext.NewAppState() | |||
| cfg := config.Load() | |||
| // Kafka reader for Raw MQTT beacons | |||
| rawReader := kafkaclient.KafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc") | |||
| defer rawReader.Close() | |||
| // Define context | |||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | |||
| defer stop() | |||
| // Kafka reader for API server updates | |||
| apiReader := kafkaclient.KafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc") | |||
| defer apiReader.Close() | |||
| rawReader := appState.AddKafkaReader(cfg.KafkaURL, "rawbeacons", "gid-raw-loc") | |||
| apiReader := appState.AddKafkaReader(cfg.KafkaURL, "apibeacons", "gid-api-loc") | |||
| writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "locevents") | |||
| defer writer.Close() | |||
| writer := appState.AddKafkaWriter(cfg.KafkaURL, "locevents") | |||
| fmt.Println("Locations algorithm initialized, subscribed to Kafka topics") | |||
| @@ -38,11 +41,15 @@ func main() { | |||
| chRaw := make(chan model.BeaconAdvertisement, 2000) | |||
| chApi := make(chan model.ApiUpdate, 2000) | |||
| go kafkaclient.Consume(rawReader, chRaw) | |||
| go kafkaclient.Consume(apiReader, chApi) | |||
| wg.Add(2) | |||
| go kafkaclient.Consume(rawReader, chRaw, ctx, &wg) | |||
| go kafkaclient.Consume(apiReader, chApi, ctx, &wg) | |||
| eventLoop: | |||
| for { | |||
| select { | |||
| case <-ctx.Done(): | |||
| break eventLoop | |||
| case <-locTicker.C: | |||
| getLikelyLocations(appState, writer) | |||
| case msg := <-chRaw: | |||
| @@ -58,6 +65,13 @@ func main() { | |||
| } | |||
| } | |||
| } | |||
| fmt.Println("broken out of the main event loop") | |||
| wg.Wait() | |||
| fmt.Println("All go routines have stopped, Beggining to close Kafka connections") | |||
| appState.CleanKafkaReaders() | |||
| appState.CleanKafkaWriters() | |||
| } | |||
| func getLikelyLocations(appState *appcontext.AppState, writer *kafka.Writer) { | |||
| @@ -6,7 +6,10 @@ import ( | |||
| "fmt" | |||
| "log" | |||
| "net/http" | |||
| "os/signal" | |||
| "strings" | |||
| "sync" | |||
| "syscall" | |||
| "time" | |||
| "github.com/AFASystems/presence/internal/pkg/common/appcontext" | |||
| @@ -24,102 +27,35 @@ var upgrader = websocket.Upgrader{ | |||
| CheckOrigin: func(r *http.Request) bool { return true }, | |||
| } | |||
| func main() { | |||
| HttpServer("0.0.0.0:1902") | |||
| } | |||
| var wg sync.WaitGroup | |||
| func HttpServer(addr string) { | |||
| func main() { | |||
| cfg := config.Load() | |||
| appState := appcontext.NewAppState() | |||
| // define context | |||
| ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) | |||
| defer stop() | |||
| headersOk := handlers.AllowedHeaders([]string{"X-Requested-With"}) | |||
| originsOk := handlers.AllowedOrigins([]string{"*"}) | |||
| methodsOk := handlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS"}) | |||
| // Kafka writer that relays messages | |||
| writer := kafkaclient.KafkaWriter(cfg.KafkaURL, "apibeacons") | |||
| defer writer.Close() | |||
| settingsWriter := kafkaclient.KafkaWriter(cfg.KafkaURL, "settings") | |||
| defer settingsWriter.Close() | |||
| writer := appState.AddKafkaWriter(cfg.KafkaURL, "apibeacons") | |||
| settingsWriter := appState.AddKafkaWriter(cfg.KafkaURL, "settings") | |||
| // Kafka reader for Raw MQTT beacons | |||
| locationReader := kafkaclient.KafkaReader(cfg.KafkaURL, "locevents", "gid-loc-serv") | |||
| defer locationReader.Close() | |||
| locationReader := appState.AddKafkaReader(cfg.KafkaURL, "locevents", "gid-loc-server") | |||
| alertsReader := appState.AddKafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") | |||
| // Kafka reader for API server updates | |||
| alertsReader := kafkaclient.KafkaReader(cfg.KafkaURL, "alertbeacons", "gid-alert-serv") | |||
| defer alertsReader.Close() | |||
| client := redis.NewClient(&redis.Options{ | |||
| Addr: "127.0.0.1:6379", | |||
| Password: "", | |||
| }) | |||
| ctx := context.Background() | |||
| client := appState.AddValkeyClient(cfg.ValkeyURL) | |||
| // Separate channel for location change? | |||
| chLoc := make(chan model.HTTPLocation, 200) | |||
| chEvents := make(chan model.BeaconEvent, 500) | |||
| go kafkaclient.Consume(locationReader, chLoc) | |||
| go kafkaclient.Consume(alertsReader, chEvents) | |||
| go func() { | |||
| for { | |||
| select { | |||
| case msg := <-chLoc: | |||
| beacon, ok := appState.GetBeacon(msg.ID) | |||
| if !ok { | |||
| appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation}) | |||
| } else { | |||
| beacon.ID = msg.ID | |||
| beacon.Location = msg.Location | |||
| beacon.Distance = msg.Distance | |||
| beacon.LastSeen = msg.LastSeen | |||
| beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation | |||
| appState.UpdateBeacon(msg.ID, beacon) | |||
| } | |||
| key := fmt.Sprintf("beacon:%s", msg.ID) | |||
| hashM, err := msg.RedisHashable() | |||
| if err != nil { | |||
| fmt.Println("Error in converting location into hashmap for Redis insert: ", err) | |||
| continue | |||
| } | |||
| if err := client.HSet(ctx, key, hashM).Err(); err != nil { | |||
| fmt.Println("Error in persisting set in Redis key: ", key) | |||
| continue | |||
| } | |||
| if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { | |||
| fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) | |||
| } | |||
| case msg := <-chEvents: | |||
| beacon, ok := appState.GetBeacon(msg.ID) | |||
| if !ok { | |||
| appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event}) | |||
| } else { | |||
| beacon.ID = msg.ID | |||
| beacon.BeaconType = msg.Type | |||
| beacon.HSBattery = int64(msg.Battery) | |||
| beacon.Event = msg.Event | |||
| appState.UpdateBeacon(msg.ID, beacon) | |||
| } | |||
| key := fmt.Sprintf("beacon:%s", msg.ID) | |||
| hashM, err := msg.RedisHashable() | |||
| if err != nil { | |||
| fmt.Println("Error in converting location into hashmap for Redis insert: ", err) | |||
| continue | |||
| } | |||
| if err := client.HSet(ctx, key, hashM).Err(); err != nil { | |||
| fmt.Println("Error in persisting set in Redis key: ", key) | |||
| continue | |||
| } | |||
| if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { | |||
| fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) | |||
| } | |||
| } | |||
| } | |||
| }() | |||
| wg.Add(2) | |||
| go kafkaclient.Consume(locationReader, chLoc, ctx, &wg) | |||
| go kafkaclient.Consume(alertsReader, chEvents, ctx, &wg) | |||
| r := mux.NewRouter() | |||
| @@ -146,7 +82,73 @@ func HttpServer(addr string) { | |||
| // r.HandleFunc("/ws/api/beacons/latest", serveLatestBeaconsWs(client)) | |||
| r.HandleFunc("/ws/broadcast", handleConnections(clients, broadcast)) | |||
| http.ListenAndServe(addr, handlers.CORS(originsOk, headersOk, methodsOk)(r)) | |||
| http.ListenAndServe("0.0.0.0:1902", handlers.CORS(originsOk, headersOk, methodsOk)(r)) | |||
| eventLoop: | |||
| for { | |||
| select { | |||
| case <-ctx.Done(): | |||
| break eventLoop | |||
| case msg := <-chLoc: | |||
| beacon, ok := appState.GetBeacon(msg.ID) | |||
| if !ok { | |||
| appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, Location: msg.Location, Distance: msg.Distance, LastSeen: msg.LastSeen, PreviousConfidentLocation: msg.PreviousConfidentLocation}) | |||
| } else { | |||
| beacon.ID = msg.ID | |||
| beacon.Location = msg.Location | |||
| beacon.Distance = msg.Distance | |||
| beacon.LastSeen = msg.LastSeen | |||
| beacon.PreviousConfidentLocation = msg.PreviousConfidentLocation | |||
| appState.UpdateBeacon(msg.ID, beacon) | |||
| } | |||
| key := fmt.Sprintf("beacon:%s", msg.ID) | |||
| hashM, err := msg.RedisHashable() | |||
| if err != nil { | |||
| fmt.Println("Error in converting location into hashmap for Redis insert: ", err) | |||
| continue | |||
| } | |||
| if err := client.HSet(ctx, key, hashM).Err(); err != nil { | |||
| fmt.Println("Error in persisting set in Redis key: ", key) | |||
| continue | |||
| } | |||
| if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { | |||
| fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) | |||
| } | |||
| case msg := <-chEvents: | |||
| beacon, ok := appState.GetBeacon(msg.ID) | |||
| if !ok { | |||
| appState.UpdateBeacon(msg.ID, model.Beacon{ID: msg.ID, BeaconType: msg.Type, HSBattery: int64(msg.Battery), Event: msg.Event}) | |||
| } else { | |||
| beacon.ID = msg.ID | |||
| beacon.BeaconType = msg.Type | |||
| beacon.HSBattery = int64(msg.Battery) | |||
| beacon.Event = msg.Event | |||
| appState.UpdateBeacon(msg.ID, beacon) | |||
| } | |||
| key := fmt.Sprintf("beacon:%s", msg.ID) | |||
| hashM, err := msg.RedisHashable() | |||
| if err != nil { | |||
| fmt.Println("Error in converting location into hashmap for Redis insert: ", err) | |||
| continue | |||
| } | |||
| if err := client.HSet(ctx, key, hashM).Err(); err != nil { | |||
| fmt.Println("Error in persisting set in Redis key: ", key) | |||
| continue | |||
| } | |||
| if err := client.SAdd(ctx, "beacons", key).Err(); err != nil { | |||
| fmt.Println("Error in adding beacon to the beacons list for get all operation: ", err) | |||
| } | |||
| } | |||
| } | |||
| fmt.Println("broken out of the main event loop") | |||
| wg.Wait() | |||
| fmt.Println("All go routines have stopped, Beggining to close Kafka connections") | |||
| appState.CleanKafkaReaders() | |||
| appState.CleanKafkaWriters() | |||
| fmt.Println("All kafka clients shutdown, starting shutdown of valkey client") | |||
| appState.CleanValkeyClient() | |||
| } | |||
| func beaconsListSingleHandler(appstate *appcontext.AppState) http.HandlerFunc { | |||
| @@ -1,16 +1,25 @@ | |||
| package appcontext | |||
| import ( | |||
| "fmt" | |||
| "strings" | |||
| "time" | |||
| "github.com/AFASystems/presence/internal/pkg/model" | |||
| "github.com/redis/go-redis/v9" | |||
| "github.com/segmentio/kafka-go" | |||
| ) | |||
| // AppState provides centralized access to application state | |||
| type AppState struct { | |||
| beacons model.BeaconsList | |||
| settings model.Settings | |||
| beaconEvents model.BeaconEventList | |||
| beaconsLookup map[string]struct{} | |||
| latestList model.LatestBeaconsList | |||
| beacons model.BeaconsList | |||
| settings model.Settings | |||
| beaconEvents model.BeaconEventList | |||
| beaconsLookup map[string]struct{} | |||
| latestList model.LatestBeaconsList | |||
| kafkaReadersList model.KafkaReadersList | |||
| kafkaWritersList model.KafkaWritersList | |||
| valkeyDB *redis.Client | |||
| } | |||
| // NewAppState creates a new application context AppState with default values | |||
| @@ -37,7 +46,88 @@ func NewAppState() *AppState { | |||
| latestList: model.LatestBeaconsList{ | |||
| LatestList: make(map[string]model.Beacon), | |||
| }, | |||
| kafkaReadersList: model.KafkaReadersList{ | |||
| KafkaReaders: make([]*kafka.Reader, 0), | |||
| }, | |||
| kafkaWritersList: model.KafkaWritersList{ | |||
| KafkaWriters: make([]*kafka.Writer, 0), | |||
| }, | |||
| } | |||
| } | |||
| func (m *AppState) AddValkeyClient(url string) *redis.Client { | |||
| valkeyDB := redis.NewClient(&redis.Options{ | |||
| Addr: url, | |||
| Password: "", | |||
| }) | |||
| m.valkeyDB = valkeyDB | |||
| return valkeyDB | |||
| } | |||
| func (m *AppState) CleanValkeyClient() { | |||
| fmt.Println("shutdown of valkey client starts") | |||
| if err := m.valkeyDB.Close(); err != nil { | |||
| fmt.Println("Error in shuting down valkey client") | |||
| } | |||
| fmt.Println("Succesfully shutting down valkey client") | |||
| } | |||
| func (m *AppState) AddKafkaWriter(kafkaUrl, topic string) *kafka.Writer { | |||
| kafkaWriter := &kafka.Writer{ | |||
| Addr: kafka.TCP(kafkaUrl), | |||
| Topic: topic, | |||
| Balancer: &kafka.LeastBytes{}, | |||
| Async: false, | |||
| RequiredAcks: kafka.RequireAll, | |||
| BatchSize: 100, | |||
| BatchTimeout: 10 * time.Millisecond, | |||
| } | |||
| m.kafkaWritersList.KafkaWritersLock.Lock() | |||
| m.kafkaWritersList.KafkaWriters = append(m.kafkaWritersList.KafkaWriters, kafkaWriter) | |||
| m.kafkaWritersList.KafkaWritersLock.Unlock() | |||
| return kafkaWriter | |||
| } | |||
| func (m *AppState) CleanKafkaWriters() { | |||
| fmt.Println("shutdown of kafka readers starts") | |||
| for _, r := range m.kafkaWritersList.KafkaWriters { | |||
| if err := r.Close(); err != nil { | |||
| fmt.Printf("Error in closing kafka writer %v", err) | |||
| } | |||
| } | |||
| fmt.Println("Kafka writers graceful shutdown complete") | |||
| } | |||
| func (m *AppState) AddKafkaReader(kafkaUrl, topic, groupID string) *kafka.Reader { | |||
| brokers := strings.Split(kafkaUrl, ",") | |||
| kafkaReader := kafka.NewReader(kafka.ReaderConfig{ | |||
| Brokers: brokers, | |||
| GroupID: groupID, | |||
| Topic: topic, | |||
| MinBytes: 1, | |||
| MaxBytes: 10e6, | |||
| }) | |||
| m.kafkaReadersList.KafkaReadersLock.Lock() | |||
| m.kafkaReadersList.KafkaReaders = append(m.kafkaReadersList.KafkaReaders, kafkaReader) | |||
| m.kafkaReadersList.KafkaReadersLock.Unlock() | |||
| return kafkaReader | |||
| } | |||
| func (m *AppState) CleanKafkaReaders() { | |||
| for _, r := range m.kafkaReadersList.KafkaReaders { | |||
| if err := r.Close(); err != nil { | |||
| fmt.Printf("Error in closing kafka reader %v", err) | |||
| } | |||
| } | |||
| fmt.Println("Kafka readers graceful shutdown complete") | |||
| } | |||
| // GetBeacons returns thread-safe access to beacons list | |||
| @@ -12,6 +12,7 @@ type Config struct { | |||
| DBPath string | |||
| KafkaURL string | |||
| RedisURL string | |||
| ValkeyURL string | |||
| } | |||
| // getEnv returns env var value or a default if not set. | |||
| @@ -32,5 +33,6 @@ func Load() *Config { | |||
| MQTTClientID: getEnv("MQTT_CLIENT_ID", "presence-detector"), | |||
| DBPath: getEnv("DB_PATH", "/data/conf/presence/presence.db"), | |||
| KafkaURL: getEnv("KAFKA_URL", "127.0.0.1:9092"), | |||
| ValkeyURL: getEnv("VALKEY_URL", "127.0.0.1:6379"), | |||
| } | |||
| } | |||
| @@ -4,24 +4,32 @@ import ( | |||
| "context" | |||
| "encoding/json" | |||
| "fmt" | |||
| "sync" | |||
| "github.com/segmentio/kafka-go" | |||
| ) | |||
| func Consume[T any](r *kafka.Reader, ch chan<- T) { | |||
| func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.WaitGroup) { | |||
| defer wg.Done() | |||
| for { | |||
| msg, err := r.ReadMessage(context.Background()) | |||
| if err != nil { | |||
| fmt.Println("error reading message:", err) | |||
| continue | |||
| } | |||
| select { | |||
| case <-ctx.Done(): | |||
| fmt.Println("consumer closed") | |||
| return | |||
| default: | |||
| msg, err := r.ReadMessage(ctx) | |||
| if err != nil { | |||
| fmt.Println("error reading message:", err) | |||
| continue | |||
| } | |||
| var data T | |||
| if err := json.Unmarshal(msg.Value, &data); err != nil { | |||
| fmt.Println("error decoding:", err) | |||
| continue | |||
| } | |||
| var data T | |||
| if err := json.Unmarshal(msg.Value, &data); err != nil { | |||
| fmt.Println("error decoding:", err) | |||
| continue | |||
| } | |||
| ch <- data | |||
| ch <- data | |||
| } | |||
| } | |||
| } | |||
| @@ -6,6 +6,9 @@ import ( | |||
| "github.com/segmentio/kafka-go" | |||
| ) | |||
| // Create Kafka reader | |||
| // | |||
| // Deprecated: Use context manager object instead | |||
| func KafkaReader(kafkaURL, topic, groupID string) *kafka.Reader { | |||
| brokers := strings.Split(kafkaURL, ",") | |||
| return kafka.NewReader(kafka.ReaderConfig{ | |||
| @@ -6,6 +6,9 @@ import ( | |||
| "github.com/segmentio/kafka-go" | |||
| ) | |||
| // Create Kafka writer | |||
| // | |||
| // Deprecated: Use context manager object instead | |||
| func KafkaWriter(kafkaURL, topic string) *kafka.Writer { | |||
| return &kafka.Writer{ | |||
| Addr: kafka.TCP(kafkaURL), | |||
| @@ -2,6 +2,8 @@ package model | |||
| import ( | |||
| "sync" | |||
| "github.com/segmentio/kafka-go" | |||
| ) | |||
| // Settings defines configuration parameters for presence detection behavior. | |||
| @@ -198,5 +200,15 @@ type ApiUpdate struct { | |||
| ID string | |||
| } | |||
| type KafkaReadersList struct { | |||
| KafkaReadersLock sync.RWMutex | |||
| KafkaReaders []*kafka.Reader | |||
| } | |||
| type KafkaWritersList struct { | |||
| KafkaWritersLock sync.RWMutex | |||
| KafkaWriters []*kafka.Writer | |||
| } | |||
| var HTTPHostPathPtr *string | |||
| var HTTPWSHostPathPtr *string | |||