|
- package kafkaclient
-
- import (
- "fmt"
- "strings"
- "sync"
- "time"
-
- "github.com/segmentio/kafka-go"
- )
-
- type KafkaReadersMap struct {
- KafkaReadersLock sync.RWMutex
- KafkaReaders map[string]*kafka.Reader
- }
-
- type KafkaWritersMap struct {
- KafkaWritersLock sync.RWMutex
- KafkaWriters map[string]*kafka.Writer
- }
-
- type KafkaManager struct {
- kafkaReadersMap KafkaReadersMap
- kafkaWritersMap KafkaWritersMap
- }
-
- func InitKafkaManager() *KafkaManager {
- return &KafkaManager{
- kafkaReadersMap: KafkaReadersMap{
- KafkaReaders: make(map[string]*kafka.Reader),
- },
- kafkaWritersMap: KafkaWritersMap{
- KafkaWriters: make(map[string]*kafka.Writer),
- },
- }
- }
-
- func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) {
- kafkaWriter := &kafka.Writer{
- Addr: kafka.TCP(kafkaUrl),
- Topic: topic,
- Balancer: &kafka.LeastBytes{},
- Async: false,
- RequiredAcks: kafka.RequireAll,
- BatchSize: 100,
- BatchTimeout: 10 * time.Millisecond,
- }
-
- m.kafkaWritersMap.KafkaWritersLock.Lock()
- m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter
- m.kafkaWritersMap.KafkaWritersLock.Unlock()
- }
-
- func (m *KafkaManager) CleanKafkaWriters() {
- fmt.Println("shutdown of kafka readers starts")
- m.kafkaWritersMap.KafkaWritersLock.Lock()
- for _, r := range m.kafkaWritersMap.KafkaWriters {
- if err := r.Close(); err != nil {
- fmt.Printf("Error in closing kafka writer %v", err)
- }
- }
- m.kafkaWritersMap.KafkaWritersLock.Unlock()
- fmt.Println("Kafka writers graceful shutdown complete")
- }
-
- func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) {
- brokers := strings.Split(kafkaUrl, ",")
- kafkaReader := kafka.NewReader(kafka.ReaderConfig{
- Brokers: brokers,
- GroupID: groupID,
- Topic: topic,
- MinBytes: 1,
- MaxBytes: 10e6,
- })
-
- m.kafkaReadersMap.KafkaReadersLock.Lock()
- m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader
- m.kafkaReadersMap.KafkaReadersLock.Unlock()
- }
-
- func (m *KafkaManager) CleanKafkaReaders() {
- m.kafkaReadersMap.KafkaReadersLock.Lock()
- for _, r := range m.kafkaReadersMap.KafkaReaders {
- if err := r.Close(); err != nil {
- fmt.Printf("Error in closing kafka reader %v", err)
- }
- }
- m.kafkaReadersMap.KafkaReadersLock.Unlock()
- fmt.Println("Kafka readers graceful shutdown complete")
- }
-
- func (m *KafkaManager) PopulateKafkaManager(url, name string, topics []string) {
- for _, topic := range topics {
- if name != "" {
- gid := fmt.Sprintf("%s-%s", topic, name)
- m.AddKafkaReader(url, topic, gid)
- } else {
- m.AddKafkaWriter(url, topic)
- }
- }
- }
-
- func (m *KafkaManager) GetReader(topic string) *kafka.Reader {
- m.kafkaReadersMap.KafkaReadersLock.Lock()
- defer m.kafkaReadersMap.KafkaReadersLock.Unlock()
- return m.kafkaReadersMap.KafkaReaders[topic]
- }
-
- func (m *KafkaManager) GetWriter(topic string) *kafka.Writer {
- m.kafkaWritersMap.KafkaWritersLock.Lock()
- defer m.kafkaWritersMap.KafkaWritersLock.Unlock()
- return m.kafkaWritersMap.KafkaWriters[topic]
- }
|