Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.
 
 
 
 

120 строки
3.0 KiB

  1. package kafkaclient
  2. import (
  3. "fmt"
  4. "log/slog"
  5. "strings"
  6. "sync"
  7. "time"
  8. "github.com/segmentio/kafka-go"
  9. )
  10. type KafkaReadersMap struct {
  11. KafkaReadersLock sync.RWMutex
  12. KafkaReaders map[string]*kafka.Reader
  13. }
  14. type KafkaWritersMap struct {
  15. KafkaWritersLock sync.RWMutex
  16. KafkaWriters map[string]*kafka.Writer
  17. }
  18. type KafkaManager struct {
  19. kafkaReadersMap KafkaReadersMap
  20. kafkaWritersMap KafkaWritersMap
  21. }
  22. func InitKafkaManager() *KafkaManager {
  23. return &KafkaManager{
  24. kafkaReadersMap: KafkaReadersMap{
  25. KafkaReaders: make(map[string]*kafka.Reader),
  26. },
  27. kafkaWritersMap: KafkaWritersMap{
  28. KafkaWriters: make(map[string]*kafka.Writer),
  29. },
  30. }
  31. }
  32. func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) {
  33. kafkaWriter := &kafka.Writer{
  34. Addr: kafka.TCP(kafkaUrl),
  35. Topic: topic,
  36. Balancer: &kafka.LeastBytes{},
  37. Async: false,
  38. RequiredAcks: kafka.RequireAll,
  39. BatchSize: 100,
  40. BatchTimeout: 10 * time.Millisecond,
  41. }
  42. m.kafkaWritersMap.KafkaWritersLock.Lock()
  43. m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter
  44. m.kafkaWritersMap.KafkaWritersLock.Unlock()
  45. }
  46. func (m *KafkaManager) CleanKafkaWriters() {
  47. msg := "shutdown of kafka writers starts"
  48. slog.Info(msg)
  49. m.kafkaWritersMap.KafkaWritersLock.Lock()
  50. for _, r := range m.kafkaWritersMap.KafkaWriters {
  51. if err := r.Close(); err != nil {
  52. msg := fmt.Sprintf("Error in closing kafka writer %v", err)
  53. slog.Error(msg)
  54. }
  55. }
  56. m.kafkaWritersMap.KafkaWritersLock.Unlock()
  57. msg = "Kafka writers graceful shutdown complete"
  58. slog.Info(msg)
  59. }
  60. func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) {
  61. brokers := strings.Split(kafkaUrl, ",")
  62. kafkaReader := kafka.NewReader(kafka.ReaderConfig{
  63. Brokers: brokers,
  64. GroupID: groupID,
  65. Topic: topic,
  66. MinBytes: 1,
  67. MaxBytes: 10e6,
  68. })
  69. m.kafkaReadersMap.KafkaReadersLock.Lock()
  70. m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader
  71. m.kafkaReadersMap.KafkaReadersLock.Unlock()
  72. }
  73. func (m *KafkaManager) CleanKafkaReaders() {
  74. m.kafkaReadersMap.KafkaReadersLock.Lock()
  75. for _, r := range m.kafkaReadersMap.KafkaReaders {
  76. if err := r.Close(); err != nil {
  77. msg := fmt.Sprintf("Error in closing kafka reader %v", err)
  78. slog.Error(msg)
  79. }
  80. }
  81. m.kafkaReadersMap.KafkaReadersLock.Unlock()
  82. msg := "Kafka readers graceful shutdown complete"
  83. slog.Info(msg)
  84. }
  85. func (m *KafkaManager) PopulateKafkaManager(url, name string, topics []string) {
  86. for _, topic := range topics {
  87. if name != "" {
  88. gid := fmt.Sprintf("%s-%s", topic, name)
  89. m.AddKafkaReader(url, topic, gid)
  90. } else {
  91. m.AddKafkaWriter(url, topic)
  92. }
  93. }
  94. }
  95. func (m *KafkaManager) GetReader(topic string) *kafka.Reader {
  96. m.kafkaReadersMap.KafkaReadersLock.RLock()
  97. defer m.kafkaReadersMap.KafkaReadersLock.RUnlock()
  98. return m.kafkaReadersMap.KafkaReaders[topic]
  99. }
  100. func (m *KafkaManager) GetWriter(topic string) *kafka.Writer {
  101. m.kafkaWritersMap.KafkaWritersLock.RLock()
  102. defer m.kafkaWritersMap.KafkaWritersLock.RUnlock()
  103. return m.kafkaWritersMap.KafkaWriters[topic]
  104. }