You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

38 lines
621 B

  1. package kafkaclient
  2. import (
  3. "context"
  4. "encoding/json"
  5. "log/slog"
  6. "sync"
  7. "github.com/segmentio/kafka-go"
  8. )
  9. func Consume[T any](r *kafka.Reader, ch chan<- T, ctx context.Context, wg *sync.WaitGroup) {
  10. defer wg.Done()
  11. for {
  12. select {
  13. case <-ctx.Done():
  14. msg := "consumer closed"
  15. slog.Info(msg)
  16. return
  17. default:
  18. msg, err := r.ReadMessage(ctx)
  19. if err != nil {
  20. slog.Error("error reading message", "error", err)
  21. continue
  22. }
  23. var data T
  24. if err := json.Unmarshal(msg.Value, &data); err != nil {
  25. slog.Error("error decoding", "error", err)
  26. continue
  27. }
  28. ch <- data
  29. }
  30. }
  31. }