您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 
 
 

28 行
438 B

  1. package kafkaclient
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/segmentio/kafka-go"
  7. )
  8. func Consume[T any](r *kafka.Reader, ch chan<- T) {
  9. for {
  10. msg, err := r.ReadMessage(context.Background())
  11. if err != nil {
  12. fmt.Println("error reading message:", err)
  13. continue
  14. }
  15. var data T
  16. if err := json.Unmarshal(msg.Value, &data); err != nil {
  17. fmt.Println("error decoding:", err)
  18. continue
  19. }
  20. ch <- data
  21. }
  22. }