You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

114 rivejä
2.9 KiB

  1. package kafkaclient
  2. import (
  3. "fmt"
  4. "strings"
  5. "sync"
  6. "time"
  7. "github.com/segmentio/kafka-go"
  8. )
  9. type KafkaReadersMap struct {
  10. KafkaReadersLock sync.RWMutex
  11. KafkaReaders map[string]*kafka.Reader
  12. }
  13. type KafkaWritersMap struct {
  14. KafkaWritersLock sync.RWMutex
  15. KafkaWriters map[string]*kafka.Writer
  16. }
  17. type KafkaManager struct {
  18. kafkaReadersMap KafkaReadersMap
  19. kafkaWritersMap KafkaWritersMap
  20. }
  21. func InitKafkaManager() *KafkaManager {
  22. return &KafkaManager{
  23. kafkaReadersMap: KafkaReadersMap{
  24. KafkaReaders: make(map[string]*kafka.Reader),
  25. },
  26. kafkaWritersMap: KafkaWritersMap{
  27. KafkaWriters: make(map[string]*kafka.Writer),
  28. },
  29. }
  30. }
  31. func (m *KafkaManager) AddKafkaWriter(kafkaUrl, topic string) {
  32. kafkaWriter := &kafka.Writer{
  33. Addr: kafka.TCP(kafkaUrl),
  34. Topic: topic,
  35. Balancer: &kafka.LeastBytes{},
  36. Async: false,
  37. RequiredAcks: kafka.RequireAll,
  38. BatchSize: 100,
  39. BatchTimeout: 10 * time.Millisecond,
  40. }
  41. m.kafkaWritersMap.KafkaWritersLock.Lock()
  42. m.kafkaWritersMap.KafkaWriters[topic] = kafkaWriter
  43. m.kafkaWritersMap.KafkaWritersLock.Unlock()
  44. }
  45. func (m *KafkaManager) CleanKafkaWriters() {
  46. fmt.Println("shutdown of kafka readers starts")
  47. m.kafkaWritersMap.KafkaWritersLock.Lock()
  48. for _, r := range m.kafkaWritersMap.KafkaWriters {
  49. if err := r.Close(); err != nil {
  50. fmt.Printf("Error in closing kafka writer %v", err)
  51. }
  52. }
  53. m.kafkaWritersMap.KafkaWritersLock.Unlock()
  54. fmt.Println("Kafka writers graceful shutdown complete")
  55. }
  56. func (m *KafkaManager) AddKafkaReader(kafkaUrl, topic, groupID string) {
  57. brokers := strings.Split(kafkaUrl, ",")
  58. kafkaReader := kafka.NewReader(kafka.ReaderConfig{
  59. Brokers: brokers,
  60. GroupID: groupID,
  61. Topic: topic,
  62. MinBytes: 1,
  63. MaxBytes: 10e6,
  64. })
  65. m.kafkaReadersMap.KafkaReadersLock.Lock()
  66. m.kafkaReadersMap.KafkaReaders[topic] = kafkaReader
  67. m.kafkaReadersMap.KafkaReadersLock.Unlock()
  68. }
  69. func (m *KafkaManager) CleanKafkaReaders() {
  70. m.kafkaReadersMap.KafkaReadersLock.Lock()
  71. for _, r := range m.kafkaReadersMap.KafkaReaders {
  72. if err := r.Close(); err != nil {
  73. fmt.Printf("Error in closing kafka reader %v", err)
  74. }
  75. }
  76. m.kafkaReadersMap.KafkaReadersLock.Unlock()
  77. fmt.Println("Kafka readers graceful shutdown complete")
  78. }
  79. func (m *KafkaManager) PopulateKafkaManager(url, name string, topics []string) {
  80. for _, topic := range topics {
  81. if name != "" {
  82. gid := fmt.Sprintf("%s-%s", topic, name)
  83. m.AddKafkaReader(url, topic, gid)
  84. } else {
  85. m.AddKafkaWriter(url, topic)
  86. }
  87. }
  88. }
  89. func (m *KafkaManager) GetReader(topic string) *kafka.Reader {
  90. m.kafkaReadersMap.KafkaReadersLock.Lock()
  91. defer m.kafkaReadersMap.KafkaReadersLock.Unlock()
  92. return m.kafkaReadersMap.KafkaReaders[topic]
  93. }
  94. func (m *KafkaManager) GetWriter(topic string) *kafka.Writer {
  95. m.kafkaWritersMap.KafkaWritersLock.Lock()
  96. defer m.kafkaWritersMap.KafkaWritersLock.Unlock()
  97. return m.kafkaWritersMap.KafkaWriters[topic]
  98. }