25개 이상의 토픽을 선택하실 수 없습니다. Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

144 lines
3.6 KiB

  1. package kafkaclient
  2. import (
  3. "fmt"
  4. "log/slog"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/segmentio/kafka-go"
  9. )
  10. type KafkaReadersMap struct {
  11. KafkaReadersLock sync.RWMutex
  12. KafkaReaders map[string]*kafka.Reader
  13. }
  14. type KafkaWritersMap struct {
  15. KafkaWritersLock sync.RWMutex
  16. KafkaWriters map[string]*kafka.Writer
  17. }
  18. type KafkaManager struct {
  19. kafkaReadersMap KafkaReadersMap
  20. kafkaWritersMap KafkaWritersMap
  21. }
  22. func InitKafkaManager() *KafkaManager {
  23. return &KafkaManager{
  24. kafkaReadersMap: KafkaReadersMap{
  25. KafkaReaders: make(map[string]*kafka.Reader),
  26. },
  27. kafkaWritersMap: KafkaWritersMap{
  28. KafkaWriters: make(map[string]*kafka.Writer),
  29. },
  30. }
  31. }
  32. func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) {
  33. kafkaWriter := &kafka.Writer{
  34. Addr: kafka.TCP(kafkaUrl),
  35. Topic: topic,
  36. Balancer: &kafka.LeastBytes{},
  37. Async: false,
  38. RequiredAcks: kafka.RequireAll,
  39. BatchSize: 100,
  40. BatchTimeout: 10 * time.Millisecond,
  41. MaxAttempts: 5,
  42. WriteBackoffMin: 100 * time.Millisecond,
  43. WriteBackoffMax: 1 * time.Second,
  44. WriteTimeout: 5 * time.Second,
  45. }
  46. m.kafkaWritersMap.KafkaWritersLock.Lock()
  47. m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter
  48. m.kafkaWritersMap.KafkaWritersLock.Unlock()
  49. }
  50. func (m *KafkaManager) CleanKafkaWriters() {
  51. msg := "shutdown of kafka writers starts"
  52. slog.Info(msg)
  53. m.kafkaWritersMap.KafkaWritersLock.Lock()
  54. for _, r := range m.kafkaWritersMap.KafkaWriters {
  55. if err := r.Close(); err != nil {
  56. msg := fmt.Sprintf("Error in closing kafka writer %v", err)
  57. slog.Error(msg)
  58. }
  59. }
  60. m.kafkaWritersMap.KafkaWritersLock.Unlock()
  61. msg = "Kafka writers graceful shutdown complete"
  62. slog.Info(msg)
  63. }
  64. func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) {
  65. brokers := strings.Split(kafkaUrl, ",")
  66. kafkaReader := kafka.NewReader(kafka.ReaderConfig{
  67. Brokers: brokers,
  68. GroupID: groupID,
  69. Topic: topic,
  70. MinBytes: 1,
  71. MaxBytes: 10e6,
  72. })
  73. m.kafkaReadersMap.KafkaReadersLock.Lock()
  74. m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader
  75. m.kafkaReadersMap.KafkaReadersLock.Unlock()
  76. }
  77. func (m *KafkaManager) CleanKafkaReaders() {
  78. m.kafkaReadersMap.KafkaReadersLock.Lock()
  79. for _, r := range m.kafkaReadersMap.KafkaReaders {
  80. if err := r.Close(); err != nil {
  81. msg := fmt.Sprintf("Error in closing kafka reader %v", err)
  82. slog.Error(msg)
  83. }
  84. }
  85. m.kafkaReadersMap.KafkaReadersLock.Unlock()
  86. msg := "Kafka readers graceful shutdown complete"
  87. slog.Info(msg)
  88. }
  89. func (m *KafkaManager) PopulateKafkaManager(url, name string, topics []string) {
  90. for _, topic := range topics {
  91. if name != "" {
  92. gid := fmt.Sprintf("%s-%s", topic, name)
  93. m.AddKafkaReader(url, topic, gid)
  94. } else {
  95. m.AddKafkaWriter(url, topic)
  96. }
  97. }
  98. }
  99. func (m *KafkaManager) GetReader(topic string) *kafka.Reader {
  100. m.kafkaReadersMap.KafkaReadersLock.RLock()
  101. defer m.kafkaReadersMap.KafkaReadersLock.RUnlock()
  102. return m.kafkaReadersMap.KafkaReaders[topic]
  103. }
  104. func (m *KafkaManager) GetWriter(topic string) *kafka.Writer {
  105. m.kafkaWritersMap.KafkaWritersLock.RLock()
  106. defer m.kafkaWritersMap.KafkaWritersLock.RUnlock()
  107. return m.kafkaWritersMap.KafkaWriters[topic]
  108. }
  109. func (m *KafkaManager) GetReaders() []string {
  110. m.kafkaReadersMap.KafkaReadersLock.RLock()
  111. var readers []string
  112. for key := range m.kafkaReadersMap.KafkaReaders {
  113. readers = append(readers, key)
  114. }
  115. m.kafkaReadersMap.KafkaReadersLock.RUnlock()
  116. return readers
  117. }
  118. func (m *KafkaManager) GetWriters() []string {
  119. m.kafkaWritersMap.KafkaWritersLock.RLock()
  120. var writers []string
  121. for key := range m.kafkaWritersMap.KafkaWriters {
  122. writers = append(writers, key)
  123. }
  124. m.kafkaWritersMap.KafkaWritersLock.RUnlock()
  125. return writers
  126. }