package kafkaclient import ( "fmt" "strings" "sync" "time" "github.com/segmentio/kafka-go" ) type KafkaReadersMap struct { KafkaReadersLock sync.RWMutex KafkaReaders map[string]*kafka.Reader } type KafkaWritersMap struct { KafkaWritersLock sync.RWMutex KafkaWriters map[string]*kafka.Writer } type KafkaManager struct { kafkaReadersMap KafkaReadersMap kafkaWritersMap KafkaWritersMap } func InitKafkaManager() *KafkaManager { return &KafkaManager{ kafkaReadersMap: KafkaReadersMap{ KafkaReaders: make(map[string]*kafka.Reader), }, kafkaWritersMap: KafkaWritersMap{ KafkaWriters: make(map[string]*kafka.Writer), }, } } func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) { kafkaWriter := &kafka.Writer{ Addr: kafka.TCP(kafkaUrl), Topic: topic, Balancer: &kafka.LeastBytes{}, Async: false, RequiredAcks: kafka.RequireAll, BatchSize: 100, BatchTimeout: 10 * time.Millisecond, } m.kafkaWritersMap.KafkaWritersLock.Lock() m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter m.kafkaWritersMap.KafkaWritersLock.Unlock() } func (m *KafkaManager) CleanKafkaWriters() { fmt.Println("shutdown of kafka readers starts") m.kafkaWritersMap.KafkaWritersLock.Lock() for _, r := range m.kafkaWritersMap.KafkaWriters { if err := r.Close(); err != nil { fmt.Printf("Error in closing kafka writer %v", err) } } m.kafkaWritersMap.KafkaWritersLock.Unlock() fmt.Println("Kafka writers graceful shutdown complete") } func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) { brokers := strings.Split(kafkaUrl, ",") kafkaReader := kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, GroupID: groupID, Topic: topic, MinBytes: 1, MaxBytes: 10e6, }) m.kafkaReadersMap.KafkaReadersLock.Lock() m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader m.kafkaReadersMap.KafkaReadersLock.Unlock() } func (m *KafkaManager) CleanKafkaReaders() { m.kafkaReadersMap.KafkaReadersLock.Lock() for _, r := range m.kafkaReadersMap.KafkaReaders { if err := r.Close(); err != nil { fmt.Printf("Error in closing kafka reader %v", err) } } m.kafkaReadersMap.KafkaReadersLock.Unlock() fmt.Println("Kafka readers graceful shutdown complete") } func (m *KafkaManager) PopulateKafkaManager(url, name string, topics []string) { for _, topic := range topics { if name != "" { gid := fmt.Sprintf("%s-%s", topic, name) m.AddKafkaReader(url, topic, gid) } else { m.AddKafkaWriter(url, topic) } } } func (m *KafkaManager) GetReader(topic string) *kafka.Reader { m.kafkaReadersMap.KafkaReadersLock.Lock() defer m.kafkaReadersMap.KafkaReadersLock.Unlock() return m.kafkaReadersMap.KafkaReaders[topic] } func (m *KafkaManager) GetWriter(topic string) *kafka.Writer { m.kafkaWritersMap.KafkaWritersLock.Lock() defer m.kafkaWritersMap.KafkaWritersLock.Unlock() return m.kafkaWritersMap.KafkaWriters[topic] }