Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

1622 rindas
46 KiB

  1. package kafka
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "sort"
  9. "strconv"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. const (
  15. LastOffset int64 = -1 // The most recent offset available for a partition.
  16. FirstOffset int64 = -2 // The least recent offset available for a partition.
  17. )
  18. const (
  19. // defaultCommitRetries holds the number of commit attempts to make
  20. // before giving up.
  21. defaultCommitRetries = 3
  22. )
  23. const (
  24. // defaultFetchMinBytes of 1 byte means that fetch requests are answered as
  25. // soon as a single byte of data is available or the fetch request times out
  26. // waiting for data to arrive.
  27. defaultFetchMinBytes = 1
  28. )
  29. var (
  30. errOnlyAvailableWithGroup = errors.New("unavailable when GroupID is not set")
  31. errNotAvailableWithGroup = errors.New("unavailable when GroupID is set")
  32. )
  33. const (
  34. // defaultReadBackoffMax/Min sets the boundaries for how long the reader wait before
  35. // polling for new messages.
  36. defaultReadBackoffMin = 100 * time.Millisecond
  37. defaultReadBackoffMax = 1 * time.Second
  38. )
  39. // Reader provides a high-level API for consuming messages from kafka.
  40. //
  41. // A Reader automatically manages reconnections to a kafka server, and
  42. // blocking methods have context support for asynchronous cancellations.
  43. //
  44. // Note that it is important to call `Close()` on a `Reader` when a process exits.
  45. // The kafka server needs a graceful disconnect to stop it from continuing to
  46. // attempt to send messages to the connected clients. The given example will not
  47. // call `Close()` if the process is terminated with SIGINT (ctrl-c at the shell) or
  48. // SIGTERM (as docker stop or a kubernetes restart does). This can result in a
  49. // delay when a new reader on the same topic connects (e.g. new process started
  50. // or new container running). Use a `signal.Notify` handler to close the reader on
  51. // process shutdown.
  52. type Reader struct {
  53. // immutable fields of the reader
  54. config ReaderConfig
  55. // communication channels between the parent reader and its subreaders
  56. msgs chan readerMessage
  57. // mutable fields of the reader (synchronized on the mutex)
  58. mutex sync.Mutex
  59. join sync.WaitGroup
  60. cancel context.CancelFunc
  61. stop context.CancelFunc
  62. done chan struct{}
  63. commits chan commitRequest
  64. version int64 // version holds the generation of the spawned readers
  65. offset int64
  66. lag int64
  67. closed bool
  68. // Without a group subscription (when Reader.config.GroupID == ""),
  69. // when errors occur, the Reader gets a synthetic readerMessage with
  70. // a non-nil err set. With group subscriptions however, when an error
  71. // occurs in Reader.run, there's no reader running (sic, cf. reader vs.
  72. // Reader) and there's no way to let the high-level methods like
  73. // FetchMessage know that an error indeed occurred. If an error in run
  74. // occurs, it will be non-block-sent to this unbuffered channel, where
  75. // the high-level methods can select{} on it and notify the caller.
  76. runError chan error
  77. // reader stats are all made of atomic values, no need for synchronization.
  78. once uint32
  79. stctx context.Context
  80. // reader stats are all made of atomic values, no need for synchronization.
  81. // Use a pointer to ensure 64-bit alignment of the values.
  82. stats *readerStats
  83. }
  84. // useConsumerGroup indicates whether the Reader is part of a consumer group.
  85. func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" }
  86. func (r *Reader) getTopics() []string {
  87. if len(r.config.GroupTopics) > 0 {
  88. return r.config.GroupTopics[:]
  89. }
  90. return []string{r.config.Topic}
  91. }
  92. // useSyncCommits indicates whether the Reader is configured to perform sync or
  93. // async commits.
  94. func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 }
  95. func (r *Reader) unsubscribe() {
  96. r.cancel()
  97. r.join.Wait()
  98. // it would be interesting to drain the r.msgs channel at this point since
  99. // it will contain buffered messages for partitions that may not be
  100. // re-assigned to this reader in the next consumer group generation.
  101. // however, draining the channel could race with the client calling
  102. // ReadMessage, which could result in messages delivered and/or committed
  103. // with gaps in the offset. for now, we will err on the side of caution and
  104. // potentially have those messages be reprocessed in the next generation by
  105. // another consumer to avoid such a race.
  106. }
  107. func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
  108. offsets := make(map[topicPartition]int64)
  109. for topic, assignments := range allAssignments {
  110. for _, assignment := range assignments {
  111. key := topicPartition{
  112. topic: topic,
  113. partition: int32(assignment.ID),
  114. }
  115. offsets[key] = assignment.Offset
  116. }
  117. }
  118. r.mutex.Lock()
  119. r.start(offsets)
  120. r.mutex.Unlock()
  121. r.withLogger(func(l Logger) {
  122. l.Printf("subscribed to topics and partitions: %+v", offsets)
  123. })
  124. }
  125. // commitOffsetsWithRetry attempts to commit the specified offsets and retries
  126. // up to the specified number of times.
  127. func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash, retries int) (err error) {
  128. const (
  129. backoffDelayMin = 100 * time.Millisecond
  130. backoffDelayMax = 5 * time.Second
  131. )
  132. for attempt := 0; attempt < retries; attempt++ {
  133. if attempt != 0 {
  134. if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
  135. return
  136. }
  137. }
  138. if err = gen.CommitOffsets(offsetStash); err == nil {
  139. return
  140. }
  141. }
  142. return // err will not be nil
  143. }
  144. // offsetStash holds offsets by topic => partition => offset.
  145. type offsetStash map[string]map[int]int64
  146. // merge updates the offsetStash with the offsets from the provided messages.
  147. func (o offsetStash) merge(commits []commit) {
  148. for _, c := range commits {
  149. offsetsByPartition, ok := o[c.topic]
  150. if !ok {
  151. offsetsByPartition = map[int]int64{}
  152. o[c.topic] = offsetsByPartition
  153. }
  154. if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
  155. offsetsByPartition[c.partition] = c.offset
  156. }
  157. }
  158. }
  159. // reset clears the contents of the offsetStash.
  160. func (o offsetStash) reset() {
  161. for key := range o {
  162. delete(o, key)
  163. }
  164. }
  165. // commitLoopImmediate handles each commit synchronously.
  166. func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
  167. offsets := offsetStash{}
  168. for {
  169. select {
  170. case <-ctx.Done():
  171. // drain the commit channel and prepare a single, final commit.
  172. // the commit will combine any outstanding requests and the result
  173. // will be sent back to all the callers of CommitMessages so that
  174. // they can return.
  175. var errchs []chan<- error
  176. for hasCommits := true; hasCommits; {
  177. select {
  178. case req := <-r.commits:
  179. offsets.merge(req.commits)
  180. errchs = append(errchs, req.errch)
  181. default:
  182. hasCommits = false
  183. }
  184. }
  185. err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
  186. for _, errch := range errchs {
  187. // NOTE : this will be a buffered channel and will not block.
  188. errch <- err
  189. }
  190. return
  191. case req := <-r.commits:
  192. offsets.merge(req.commits)
  193. req.errch <- r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
  194. offsets.reset()
  195. }
  196. }
  197. }
  198. // commitLoopInterval handles each commit asynchronously with a period defined
  199. // by ReaderConfig.CommitInterval.
  200. func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) {
  201. ticker := time.NewTicker(r.config.CommitInterval)
  202. defer ticker.Stop()
  203. // the offset stash should not survive rebalances b/c the consumer may
  204. // receive new assignments.
  205. offsets := offsetStash{}
  206. commit := func() {
  207. if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil {
  208. r.withErrorLogger(func(l Logger) { l.Printf("%v", err) })
  209. } else {
  210. offsets.reset()
  211. }
  212. }
  213. for {
  214. select {
  215. case <-ctx.Done():
  216. // drain the commit channel in order to prepare the final commit.
  217. for hasCommits := true; hasCommits; {
  218. select {
  219. case req := <-r.commits:
  220. offsets.merge(req.commits)
  221. default:
  222. hasCommits = false
  223. }
  224. }
  225. commit()
  226. return
  227. case <-ticker.C:
  228. commit()
  229. case req := <-r.commits:
  230. offsets.merge(req.commits)
  231. }
  232. }
  233. }
  234. // commitLoop processes commits off the commit chan.
  235. func (r *Reader) commitLoop(ctx context.Context, gen *Generation) {
  236. r.withLogger(func(l Logger) {
  237. l.Printf("started commit for group %s\n", r.config.GroupID)
  238. })
  239. defer r.withLogger(func(l Logger) {
  240. l.Printf("stopped commit for group %s\n", r.config.GroupID)
  241. })
  242. if r.useSyncCommits() {
  243. r.commitLoopImmediate(ctx, gen)
  244. } else {
  245. r.commitLoopInterval(ctx, gen)
  246. }
  247. }
  248. // run provides the main consumer group management loop. Each iteration performs the
  249. // handshake to join the Reader to the consumer group.
  250. //
  251. // This function is responsible for closing the consumer group upon exit.
  252. func (r *Reader) run(cg *ConsumerGroup) {
  253. defer close(r.done)
  254. defer cg.Close()
  255. r.withLogger(func(l Logger) {
  256. l.Printf("entering loop for consumer group, %v\n", r.config.GroupID)
  257. })
  258. for {
  259. // Limit the number of attempts at waiting for the next
  260. // consumer generation.
  261. var err error
  262. var gen *Generation
  263. for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ {
  264. gen, err = cg.Next(r.stctx)
  265. if err == nil {
  266. break
  267. }
  268. if errors.Is(err, r.stctx.Err()) {
  269. return
  270. }
  271. r.stats.errors.observe(1)
  272. r.withErrorLogger(func(l Logger) {
  273. l.Printf("%v", err)
  274. })
  275. // Continue with next attempt...
  276. }
  277. if err != nil {
  278. // All attempts have failed.
  279. select {
  280. case r.runError <- err:
  281. // If somebody's receiving on the runError, let
  282. // them know the error occurred.
  283. default:
  284. // Otherwise, don't block to allow healing.
  285. }
  286. continue
  287. }
  288. r.stats.rebalances.observe(1)
  289. r.subscribe(gen.Assignments)
  290. gen.Start(func(ctx context.Context) {
  291. r.commitLoop(ctx, gen)
  292. })
  293. gen.Start(func(ctx context.Context) {
  294. // wait for the generation to end and then unsubscribe.
  295. select {
  296. case <-ctx.Done():
  297. // continue to next generation
  298. case <-r.stctx.Done():
  299. // this will be the last loop because the reader is closed.
  300. }
  301. r.unsubscribe()
  302. })
  303. }
  304. }
  305. // ReaderConfig is a configuration object used to create new instances of
  306. // Reader.
  307. type ReaderConfig struct {
  308. // The list of broker addresses used to connect to the kafka cluster.
  309. Brokers []string
  310. // GroupID holds the optional consumer group id. If GroupID is specified, then
  311. // Partition should NOT be specified e.g. 0
  312. GroupID string
  313. // GroupTopics allows specifying multiple topics, but can only be used in
  314. // combination with GroupID, as it is a consumer-group feature. As such, if
  315. // GroupID is set, then either Topic or GroupTopics must be defined.
  316. GroupTopics []string
  317. // The topic to read messages from.
  318. Topic string
  319. // Partition to read messages from. Either Partition or GroupID may
  320. // be assigned, but not both
  321. Partition int
  322. // An dialer used to open connections to the kafka server. This field is
  323. // optional, if nil, the default dialer is used instead.
  324. Dialer *Dialer
  325. // The capacity of the internal message queue, defaults to 100 if none is
  326. // set.
  327. QueueCapacity int
  328. // MinBytes indicates to the broker the minimum batch size that the consumer
  329. // will accept. Setting a high minimum when consuming from a low-volume topic
  330. // may result in delayed delivery when the broker does not have enough data to
  331. // satisfy the defined minimum.
  332. //
  333. // Default: 1
  334. MinBytes int
  335. // MaxBytes indicates to the broker the maximum batch size that the consumer
  336. // will accept. The broker will truncate a message to satisfy this maximum, so
  337. // choose a value that is high enough for your largest message size.
  338. //
  339. // Default: 1MB
  340. MaxBytes int
  341. // Maximum amount of time to wait for new data to come when fetching batches
  342. // of messages from kafka.
  343. //
  344. // Default: 10s
  345. MaxWait time.Duration
  346. // ReadBatchTimeout amount of time to wait to fetch message from kafka messages batch.
  347. //
  348. // Default: 10s
  349. ReadBatchTimeout time.Duration
  350. // ReadLagInterval sets the frequency at which the reader lag is updated.
  351. // Setting this field to a negative value disables lag reporting.
  352. ReadLagInterval time.Duration
  353. // GroupBalancers is the priority-ordered list of client-side consumer group
  354. // balancing strategies that will be offered to the coordinator. The first
  355. // strategy that all group members support will be chosen by the leader.
  356. //
  357. // Default: [Range, RoundRobin]
  358. //
  359. // Only used when GroupID is set
  360. GroupBalancers []GroupBalancer
  361. // HeartbeatInterval sets the optional frequency at which the reader sends the consumer
  362. // group heartbeat update.
  363. //
  364. // Default: 3s
  365. //
  366. // Only used when GroupID is set
  367. HeartbeatInterval time.Duration
  368. // CommitInterval indicates the interval at which offsets are committed to
  369. // the broker. If 0, commits will be handled synchronously.
  370. //
  371. // Default: 0
  372. //
  373. // Only used when GroupID is set
  374. CommitInterval time.Duration
  375. // PartitionWatchInterval indicates how often a reader checks for partition changes.
  376. // If a reader sees a partition change (such as a partition add) it will rebalance the group
  377. // picking up new partitions.
  378. //
  379. // Default: 5s
  380. //
  381. // Only used when GroupID is set and WatchPartitionChanges is set.
  382. PartitionWatchInterval time.Duration
  383. // WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
  384. // polling the brokers and rebalancing if any partition changes happen to the topic.
  385. WatchPartitionChanges bool
  386. // SessionTimeout optionally sets the length of time that may pass without a heartbeat
  387. // before the coordinator considers the consumer dead and initiates a rebalance.
  388. //
  389. // Default: 30s
  390. //
  391. // Only used when GroupID is set
  392. SessionTimeout time.Duration
  393. // RebalanceTimeout optionally sets the length of time the coordinator will wait
  394. // for members to join as part of a rebalance. For kafka servers under higher
  395. // load, it may be useful to set this value higher.
  396. //
  397. // Default: 30s
  398. //
  399. // Only used when GroupID is set
  400. RebalanceTimeout time.Duration
  401. // JoinGroupBackoff optionally sets the length of time to wait between re-joining
  402. // the consumer group after an error.
  403. //
  404. // Default: 5s
  405. JoinGroupBackoff time.Duration
  406. // RetentionTime optionally sets the length of time the consumer group will be saved
  407. // by the broker. -1 will disable the setting and leave the
  408. // retention up to the broker's offsets.retention.minutes property. By
  409. // default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >= 2.0.
  410. //
  411. // Default: -1
  412. //
  413. // Only used when GroupID is set
  414. RetentionTime time.Duration
  415. // StartOffset determines from whence the consumer group should begin
  416. // consuming when it finds a partition without a committed offset. If
  417. // non-zero, it must be set to one of FirstOffset or LastOffset.
  418. //
  419. // Default: FirstOffset
  420. //
  421. // Only used when GroupID is set
  422. StartOffset int64
  423. // BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
  424. // polling for new messages
  425. //
  426. // Default: 100ms
  427. ReadBackoffMin time.Duration
  428. // BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
  429. // polling for new messages
  430. //
  431. // Default: 1s
  432. ReadBackoffMax time.Duration
  433. // If not nil, specifies a logger used to report internal changes within the
  434. // reader.
  435. Logger Logger
  436. // ErrorLogger is the logger used to report errors. If nil, the reader falls
  437. // back to using Logger instead.
  438. ErrorLogger Logger
  439. // IsolationLevel controls the visibility of transactional records.
  440. // ReadUncommitted makes all records visible. With ReadCommitted only
  441. // non-transactional and committed records are visible.
  442. IsolationLevel IsolationLevel
  443. // Limit of how many attempts to connect will be made before returning the error.
  444. //
  445. // The default is to try 3 times.
  446. MaxAttempts int
  447. // OffsetOutOfRangeError indicates that the reader should return an error in
  448. // the event of an OffsetOutOfRange error, rather than retrying indefinitely.
  449. // This flag is being added to retain backwards-compatibility, so it will be
  450. // removed in a future version of kafka-go.
  451. OffsetOutOfRangeError bool
  452. }
  453. // Validate method validates ReaderConfig properties.
  454. func (config *ReaderConfig) Validate() error {
  455. if len(config.Brokers) == 0 {
  456. return errors.New("cannot create a new kafka reader with an empty list of broker addresses")
  457. }
  458. if config.Partition < 0 || config.Partition >= math.MaxInt32 {
  459. return fmt.Errorf("partition number out of bounds: %d", config.Partition)
  460. }
  461. if config.MinBytes < 0 {
  462. return fmt.Errorf("invalid negative minimum batch size (min = %d)", config.MinBytes)
  463. }
  464. if config.MaxBytes < 0 {
  465. return fmt.Errorf("invalid negative maximum batch size (max = %d)", config.MaxBytes)
  466. }
  467. if config.GroupID != "" {
  468. if config.Partition != 0 {
  469. return errors.New("either Partition or GroupID may be specified, but not both")
  470. }
  471. if len(config.Topic) == 0 && len(config.GroupTopics) == 0 {
  472. return errors.New("either Topic or GroupTopics must be specified with GroupID")
  473. }
  474. } else if len(config.Topic) == 0 {
  475. return errors.New("cannot create a new kafka reader with an empty topic")
  476. }
  477. if config.MinBytes > config.MaxBytes {
  478. return fmt.Errorf("minimum batch size greater than the maximum (min = %d, max = %d)", config.MinBytes, config.MaxBytes)
  479. }
  480. if config.ReadBackoffMax < 0 {
  481. return fmt.Errorf("ReadBackoffMax out of bounds: %d", config.ReadBackoffMax)
  482. }
  483. if config.ReadBackoffMin < 0 {
  484. return fmt.Errorf("ReadBackoffMin out of bounds: %d", config.ReadBackoffMin)
  485. }
  486. return nil
  487. }
  488. // ReaderStats is a data structure returned by a call to Reader.Stats that exposes
  489. // details about the behavior of the reader.
  490. type ReaderStats struct {
  491. Dials int64 `metric:"kafka.reader.dial.count" type:"counter"`
  492. Fetches int64 `metric:"kafka.reader.fetch.count" type:"counter"`
  493. Messages int64 `metric:"kafka.reader.message.count" type:"counter"`
  494. Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"`
  495. Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"`
  496. Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"`
  497. Errors int64 `metric:"kafka.reader.error.count" type:"counter"`
  498. DialTime DurationStats `metric:"kafka.reader.dial.seconds"`
  499. ReadTime DurationStats `metric:"kafka.reader.read.seconds"`
  500. WaitTime DurationStats `metric:"kafka.reader.wait.seconds"`
  501. FetchSize SummaryStats `metric:"kafka.reader.fetch.size"`
  502. FetchBytes SummaryStats `metric:"kafka.reader.fetch.bytes"`
  503. Offset int64 `metric:"kafka.reader.offset" type:"gauge"`
  504. Lag int64 `metric:"kafka.reader.lag" type:"gauge"`
  505. MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"`
  506. MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"`
  507. MaxWait time.Duration `metric:"kafka.reader.fetch_wait.max" type:"gauge"`
  508. QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"`
  509. QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"`
  510. ClientID string `tag:"client_id"`
  511. Topic string `tag:"topic"`
  512. Partition string `tag:"partition"`
  513. // The original `Fetches` field had a typo where the metric name was called
  514. // "kafak..." instead of "kafka...", in order to offer time to fix monitors
  515. // that may be relying on this mistake we are temporarily introducing this
  516. // field.
  517. DeprecatedFetchesWithTypo int64 `metric:"kafak.reader.fetch.count" type:"counter"`
  518. }
  519. // readerStats is a struct that contains statistics on a reader.
  520. type readerStats struct {
  521. dials counter
  522. fetches counter
  523. messages counter
  524. bytes counter
  525. rebalances counter
  526. timeouts counter
  527. errors counter
  528. dialTime summary
  529. readTime summary
  530. waitTime summary
  531. fetchSize summary
  532. fetchBytes summary
  533. offset gauge
  534. lag gauge
  535. partition string
  536. }
  537. // NewReader creates and returns a new Reader configured with config.
  538. // The offset is initialized to FirstOffset.
  539. func NewReader(config ReaderConfig) *Reader {
  540. if err := config.Validate(); err != nil {
  541. panic(err)
  542. }
  543. if config.GroupID != "" {
  544. if len(config.GroupBalancers) == 0 {
  545. config.GroupBalancers = []GroupBalancer{
  546. RangeGroupBalancer{},
  547. RoundRobinGroupBalancer{},
  548. }
  549. }
  550. }
  551. if config.Dialer == nil {
  552. config.Dialer = DefaultDialer
  553. }
  554. if config.MaxBytes == 0 {
  555. config.MaxBytes = 1e6 // 1 MB
  556. }
  557. if config.MinBytes == 0 {
  558. config.MinBytes = defaultFetchMinBytes
  559. }
  560. if config.MaxWait == 0 {
  561. config.MaxWait = 10 * time.Second
  562. }
  563. if config.ReadBatchTimeout == 0 {
  564. config.ReadBatchTimeout = 10 * time.Second
  565. }
  566. if config.ReadLagInterval == 0 {
  567. config.ReadLagInterval = 1 * time.Minute
  568. }
  569. if config.ReadBackoffMin == 0 {
  570. config.ReadBackoffMin = defaultReadBackoffMin
  571. }
  572. if config.ReadBackoffMax == 0 {
  573. config.ReadBackoffMax = defaultReadBackoffMax
  574. }
  575. if config.ReadBackoffMax < config.ReadBackoffMin {
  576. panic(fmt.Errorf("ReadBackoffMax %d smaller than ReadBackoffMin %d", config.ReadBackoffMax, config.ReadBackoffMin))
  577. }
  578. if config.QueueCapacity == 0 {
  579. config.QueueCapacity = 100
  580. }
  581. if config.MaxAttempts == 0 {
  582. config.MaxAttempts = 3
  583. }
  584. // when configured as a consumer group; stats should report a partition of -1
  585. readerStatsPartition := config.Partition
  586. if config.GroupID != "" {
  587. readerStatsPartition = -1
  588. }
  589. // when configured as a consume group, start version as 1 to ensure that only
  590. // the rebalance function will start readers
  591. version := int64(0)
  592. if config.GroupID != "" {
  593. version = 1
  594. }
  595. stctx, stop := context.WithCancel(context.Background())
  596. r := &Reader{
  597. config: config,
  598. msgs: make(chan readerMessage, config.QueueCapacity),
  599. cancel: func() {},
  600. commits: make(chan commitRequest, config.QueueCapacity),
  601. stop: stop,
  602. offset: FirstOffset,
  603. stctx: stctx,
  604. stats: &readerStats{
  605. dialTime: makeSummary(),
  606. readTime: makeSummary(),
  607. waitTime: makeSummary(),
  608. fetchSize: makeSummary(),
  609. fetchBytes: makeSummary(),
  610. // Generate the string representation of the partition number only
  611. // once when the reader is created.
  612. partition: strconv.Itoa(readerStatsPartition),
  613. },
  614. version: version,
  615. }
  616. if r.useConsumerGroup() {
  617. r.done = make(chan struct{})
  618. r.runError = make(chan error)
  619. cg, err := NewConsumerGroup(ConsumerGroupConfig{
  620. ID: r.config.GroupID,
  621. Brokers: r.config.Brokers,
  622. Dialer: r.config.Dialer,
  623. Topics: r.getTopics(),
  624. GroupBalancers: r.config.GroupBalancers,
  625. HeartbeatInterval: r.config.HeartbeatInterval,
  626. PartitionWatchInterval: r.config.PartitionWatchInterval,
  627. WatchPartitionChanges: r.config.WatchPartitionChanges,
  628. SessionTimeout: r.config.SessionTimeout,
  629. RebalanceTimeout: r.config.RebalanceTimeout,
  630. JoinGroupBackoff: r.config.JoinGroupBackoff,
  631. RetentionTime: r.config.RetentionTime,
  632. StartOffset: r.config.StartOffset,
  633. Logger: r.config.Logger,
  634. ErrorLogger: r.config.ErrorLogger,
  635. })
  636. if err != nil {
  637. panic(err)
  638. }
  639. go r.run(cg)
  640. }
  641. return r
  642. }
  643. // Config returns the reader's configuration.
  644. func (r *Reader) Config() ReaderConfig {
  645. return r.config
  646. }
  647. // Close closes the stream, preventing the program from reading any more
  648. // messages from it.
  649. func (r *Reader) Close() error {
  650. atomic.StoreUint32(&r.once, 1)
  651. r.mutex.Lock()
  652. closed := r.closed
  653. r.closed = true
  654. r.mutex.Unlock()
  655. r.cancel()
  656. r.stop()
  657. r.join.Wait()
  658. if r.done != nil {
  659. <-r.done
  660. }
  661. if !closed {
  662. close(r.msgs)
  663. }
  664. return nil
  665. }
  666. // ReadMessage reads and return the next message from the r. The method call
  667. // blocks until a message becomes available, or an error occurs. The program
  668. // may also specify a context to asynchronously cancel the blocking operation.
  669. //
  670. // The method returns io.EOF to indicate that the reader has been closed.
  671. //
  672. // If consumer groups are used, ReadMessage will automatically commit the
  673. // offset when called. Note that this could result in an offset being committed
  674. // before the message is fully processed.
  675. //
  676. // If more fine-grained control of when offsets are committed is required, it
  677. // is recommended to use FetchMessage with CommitMessages instead.
  678. func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
  679. m, err := r.FetchMessage(ctx)
  680. if err != nil {
  681. return Message{}, fmt.Errorf("fetching message: %w", err)
  682. }
  683. if r.useConsumerGroup() {
  684. if err := r.CommitMessages(ctx, m); err != nil {
  685. return Message{}, fmt.Errorf("committing message: %w", err)
  686. }
  687. }
  688. return m, nil
  689. }
  690. // FetchMessage reads and return the next message from the r. The method call
  691. // blocks until a message becomes available, or an error occurs. The program
  692. // may also specify a context to asynchronously cancel the blocking operation.
  693. //
  694. // The method returns io.EOF to indicate that the reader has been closed.
  695. //
  696. // FetchMessage does not commit offsets automatically when using consumer groups.
  697. // Use CommitMessages to commit the offset.
  698. func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
  699. r.activateReadLag()
  700. for {
  701. r.mutex.Lock()
  702. if !r.closed && r.version == 0 {
  703. r.start(r.getTopicPartitionOffset())
  704. }
  705. version := r.version
  706. r.mutex.Unlock()
  707. select {
  708. case <-ctx.Done():
  709. return Message{}, ctx.Err()
  710. case err := <-r.runError:
  711. return Message{}, err
  712. case m, ok := <-r.msgs:
  713. if !ok {
  714. return Message{}, io.EOF
  715. }
  716. if m.version >= version {
  717. r.mutex.Lock()
  718. switch {
  719. case m.error != nil:
  720. case version == r.version:
  721. r.offset = m.message.Offset + 1
  722. r.lag = m.watermark - r.offset
  723. }
  724. r.mutex.Unlock()
  725. if errors.Is(m.error, io.EOF) {
  726. // io.EOF is used as a marker to indicate that the stream
  727. // has been closed, in case it was received from the inner
  728. // reader we don't want to confuse the program and replace
  729. // the error with io.ErrUnexpectedEOF.
  730. m.error = io.ErrUnexpectedEOF
  731. }
  732. return m.message, m.error
  733. }
  734. }
  735. }
  736. }
  737. // CommitMessages commits the list of messages passed as argument. The program
  738. // may pass a context to asynchronously cancel the commit operation when it was
  739. // configured to be blocking.
  740. //
  741. // Because kafka consumer groups track a single offset per partition, the
  742. // highest message offset passed to CommitMessages will cause all previous
  743. // messages to be committed. Applications need to account for these Kafka
  744. // limitations when committing messages, and maintain message ordering if they
  745. // need strong delivery guarantees. This property makes it valid to pass only
  746. // the last message seen to CommitMessages in order to move the offset of the
  747. // topic/partition it belonged to forward, effectively committing all previous
  748. // messages in the partition.
  749. func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
  750. if !r.useConsumerGroup() {
  751. return errOnlyAvailableWithGroup
  752. }
  753. var errch <-chan error
  754. creq := commitRequest{
  755. commits: makeCommits(msgs...),
  756. }
  757. if r.useSyncCommits() {
  758. ch := make(chan error, 1)
  759. errch, creq.errch = ch, ch
  760. }
  761. select {
  762. case r.commits <- creq:
  763. case <-ctx.Done():
  764. return ctx.Err()
  765. case <-r.stctx.Done():
  766. // This context is used to ensure we don't allow commits after the
  767. // reader was closed.
  768. return io.ErrClosedPipe
  769. }
  770. if !r.useSyncCommits() {
  771. return nil
  772. }
  773. select {
  774. case <-ctx.Done():
  775. return ctx.Err()
  776. case err := <-errch:
  777. return err
  778. }
  779. }
  780. // ReadLag returns the current lag of the reader by fetching the last offset of
  781. // the topic and partition and computing the difference between that value and
  782. // the offset of the last message returned by ReadMessage.
  783. //
  784. // This method is intended to be used in cases where a program may be unable to
  785. // call ReadMessage to update the value returned by Lag, but still needs to get
  786. // an up to date estimation of how far behind the reader is. For example when
  787. // the consumer is not ready to process the next message.
  788. //
  789. // The function returns a lag of zero when the reader's current offset is
  790. // negative.
  791. func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) {
  792. if r.useConsumerGroup() {
  793. return 0, errNotAvailableWithGroup
  794. }
  795. type offsets struct {
  796. first int64
  797. last int64
  798. }
  799. offch := make(chan offsets, 1)
  800. errch := make(chan error, 1)
  801. go func() {
  802. var off offsets
  803. var err error
  804. for _, broker := range r.config.Brokers {
  805. var conn *Conn
  806. if conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition); err != nil {
  807. continue
  808. }
  809. deadline, _ := ctx.Deadline()
  810. conn.SetDeadline(deadline)
  811. off.first, off.last, err = conn.ReadOffsets()
  812. conn.Close()
  813. if err == nil {
  814. break
  815. }
  816. }
  817. if err != nil {
  818. errch <- err
  819. } else {
  820. offch <- off
  821. }
  822. }()
  823. select {
  824. case off := <-offch:
  825. switch cur := r.Offset(); {
  826. case cur == FirstOffset:
  827. lag = off.last - off.first
  828. case cur == LastOffset:
  829. lag = 0
  830. default:
  831. lag = off.last - cur
  832. }
  833. case err = <-errch:
  834. case <-ctx.Done():
  835. err = ctx.Err()
  836. }
  837. return
  838. }
  839. // Offset returns the current absolute offset of the reader, or -1
  840. // if r is backed by a consumer group.
  841. func (r *Reader) Offset() int64 {
  842. if r.useConsumerGroup() {
  843. return -1
  844. }
  845. r.mutex.Lock()
  846. offset := r.offset
  847. r.mutex.Unlock()
  848. r.withLogger(func(log Logger) {
  849. log.Printf("looking up offset of kafka reader for partition %d of %s: %s", r.config.Partition, r.config.Topic, toHumanOffset(offset))
  850. })
  851. return offset
  852. }
  853. // Lag returns the lag of the last message returned by ReadMessage, or -1
  854. // if r is backed by a consumer group.
  855. func (r *Reader) Lag() int64 {
  856. if r.useConsumerGroup() {
  857. return -1
  858. }
  859. r.mutex.Lock()
  860. lag := r.lag
  861. r.mutex.Unlock()
  862. return lag
  863. }
  864. // SetOffset changes the offset from which the next batch of messages will be
  865. // read. The method fails with io.ErrClosedPipe if the reader has already been closed.
  866. //
  867. // From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first
  868. // or last available offset in the partition. Please note while -1 and -2 were accepted
  869. // to indicate the first or last offset in previous versions, the meanings of the numbers
  870. // were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol
  871. // specification.
  872. func (r *Reader) SetOffset(offset int64) error {
  873. if r.useConsumerGroup() {
  874. return errNotAvailableWithGroup
  875. }
  876. var err error
  877. r.mutex.Lock()
  878. if r.closed {
  879. err = io.ErrClosedPipe
  880. } else if offset != r.offset {
  881. r.withLogger(func(log Logger) {
  882. log.Printf("setting the offset of the kafka reader for partition %d of %s from %s to %s",
  883. r.config.Partition, r.config.Topic, toHumanOffset(r.offset), toHumanOffset(offset))
  884. })
  885. r.offset = offset
  886. if r.version != 0 {
  887. r.start(r.getTopicPartitionOffset())
  888. }
  889. r.activateReadLag()
  890. }
  891. r.mutex.Unlock()
  892. return err
  893. }
  894. // SetOffsetAt changes the offset from which the next batch of messages will be
  895. // read given the timestamp t.
  896. //
  897. // The method fails if the unable to connect partition leader, or unable to read the offset
  898. // given the ts, or if the reader has been closed.
  899. func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error {
  900. r.mutex.Lock()
  901. if r.closed {
  902. r.mutex.Unlock()
  903. return io.ErrClosedPipe
  904. }
  905. r.mutex.Unlock()
  906. if len(r.config.Brokers) < 1 {
  907. return errors.New("no brokers in config")
  908. }
  909. var conn *Conn
  910. var err error
  911. for _, broker := range r.config.Brokers {
  912. conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition)
  913. if err != nil {
  914. continue
  915. }
  916. deadline, _ := ctx.Deadline()
  917. conn.SetDeadline(deadline)
  918. offset, err := conn.ReadOffset(t)
  919. conn.Close()
  920. if err != nil {
  921. return err
  922. }
  923. return r.SetOffset(offset)
  924. }
  925. return fmt.Errorf("error dialing all brokers, one of the errors: %w", err)
  926. }
  927. // Stats returns a snapshot of the reader stats since the last time the method
  928. // was called, or since the reader was created if it is called for the first
  929. // time.
  930. //
  931. // A typical use of this method is to spawn a goroutine that will periodically
  932. // call Stats on a kafka reader and report the metrics to a stats collection
  933. // system.
  934. func (r *Reader) Stats() ReaderStats {
  935. stats := ReaderStats{
  936. Dials: r.stats.dials.snapshot(),
  937. Fetches: r.stats.fetches.snapshot(),
  938. Messages: r.stats.messages.snapshot(),
  939. Bytes: r.stats.bytes.snapshot(),
  940. Rebalances: r.stats.rebalances.snapshot(),
  941. Timeouts: r.stats.timeouts.snapshot(),
  942. Errors: r.stats.errors.snapshot(),
  943. DialTime: r.stats.dialTime.snapshotDuration(),
  944. ReadTime: r.stats.readTime.snapshotDuration(),
  945. WaitTime: r.stats.waitTime.snapshotDuration(),
  946. FetchSize: r.stats.fetchSize.snapshot(),
  947. FetchBytes: r.stats.fetchBytes.snapshot(),
  948. Offset: r.stats.offset.snapshot(),
  949. Lag: r.stats.lag.snapshot(),
  950. MinBytes: int64(r.config.MinBytes),
  951. MaxBytes: int64(r.config.MaxBytes),
  952. MaxWait: r.config.MaxWait,
  953. QueueLength: int64(len(r.msgs)),
  954. QueueCapacity: int64(cap(r.msgs)),
  955. ClientID: r.config.Dialer.ClientID,
  956. Topic: r.config.Topic,
  957. Partition: r.stats.partition,
  958. }
  959. // TODO: remove when we get rid of the deprecated field.
  960. stats.DeprecatedFetchesWithTypo = stats.Fetches
  961. return stats
  962. }
  963. func (r *Reader) getTopicPartitionOffset() map[topicPartition]int64 {
  964. key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)}
  965. return map[topicPartition]int64{key: r.offset}
  966. }
  967. func (r *Reader) withLogger(do func(Logger)) {
  968. if r.config.Logger != nil {
  969. do(r.config.Logger)
  970. }
  971. }
  972. func (r *Reader) withErrorLogger(do func(Logger)) {
  973. if r.config.ErrorLogger != nil {
  974. do(r.config.ErrorLogger)
  975. } else {
  976. r.withLogger(do)
  977. }
  978. }
  979. func (r *Reader) activateReadLag() {
  980. if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) {
  981. // read lag will only be calculated when not using consumer groups
  982. // todo discuss how capturing read lag should interact with rebalancing
  983. if !r.useConsumerGroup() {
  984. go r.readLag(r.stctx)
  985. }
  986. }
  987. }
  988. func (r *Reader) readLag(ctx context.Context) {
  989. ticker := time.NewTicker(r.config.ReadLagInterval)
  990. defer ticker.Stop()
  991. for {
  992. timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2)
  993. lag, err := r.ReadLag(timeout)
  994. cancel()
  995. if err != nil {
  996. r.stats.errors.observe(1)
  997. r.withErrorLogger(func(log Logger) {
  998. log.Printf("kafka reader failed to read lag of partition %d of %s: %s", r.config.Partition, r.config.Topic, err)
  999. })
  1000. } else {
  1001. r.stats.lag.observe(lag)
  1002. }
  1003. select {
  1004. case <-ticker.C:
  1005. case <-ctx.Done():
  1006. return
  1007. }
  1008. }
  1009. }
  1010. func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
  1011. if r.closed {
  1012. // don't start child reader if parent Reader is closed
  1013. return
  1014. }
  1015. ctx, cancel := context.WithCancel(context.Background())
  1016. r.cancel() // always cancel the previous reader
  1017. r.cancel = cancel
  1018. r.version++
  1019. r.join.Add(len(offsetsByPartition))
  1020. for key, offset := range offsetsByPartition {
  1021. go func(ctx context.Context, key topicPartition, offset int64, join *sync.WaitGroup) {
  1022. defer join.Done()
  1023. (&reader{
  1024. dialer: r.config.Dialer,
  1025. logger: r.config.Logger,
  1026. errorLogger: r.config.ErrorLogger,
  1027. brokers: r.config.Brokers,
  1028. topic: key.topic,
  1029. partition: int(key.partition),
  1030. minBytes: r.config.MinBytes,
  1031. maxBytes: r.config.MaxBytes,
  1032. maxWait: r.config.MaxWait,
  1033. readBatchTimeout: r.config.ReadBatchTimeout,
  1034. backoffDelayMin: r.config.ReadBackoffMin,
  1035. backoffDelayMax: r.config.ReadBackoffMax,
  1036. version: r.version,
  1037. msgs: r.msgs,
  1038. stats: r.stats,
  1039. isolationLevel: r.config.IsolationLevel,
  1040. maxAttempts: r.config.MaxAttempts,
  1041. // backwards-compatibility flags
  1042. offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
  1043. }).run(ctx, offset)
  1044. }(ctx, key, offset, &r.join)
  1045. }
  1046. }
  1047. // A reader reads messages from kafka and produces them on its channels, it's
  1048. // used as a way to asynchronously fetch messages while the main program reads
  1049. // them using the high level reader API.
  1050. type reader struct {
  1051. dialer *Dialer
  1052. logger Logger
  1053. errorLogger Logger
  1054. brokers []string
  1055. topic string
  1056. partition int
  1057. minBytes int
  1058. maxBytes int
  1059. maxWait time.Duration
  1060. readBatchTimeout time.Duration
  1061. backoffDelayMin time.Duration
  1062. backoffDelayMax time.Duration
  1063. version int64
  1064. msgs chan<- readerMessage
  1065. stats *readerStats
  1066. isolationLevel IsolationLevel
  1067. maxAttempts int
  1068. offsetOutOfRangeError bool
  1069. }
  1070. type readerMessage struct {
  1071. version int64
  1072. message Message
  1073. watermark int64
  1074. error error
  1075. }
  1076. func (r *reader) run(ctx context.Context, offset int64) {
  1077. // This is the reader's main loop, it only ends if the context is canceled
  1078. // and will keep attempting to reader messages otherwise.
  1079. //
  1080. // Retrying indefinitely has the nice side effect of preventing Read calls
  1081. // on the parent reader to block if connection to the kafka server fails,
  1082. // the reader keeps reporting errors on the error channel which will then
  1083. // be surfaced to the program.
  1084. // If the reader wasn't retrying then the program would block indefinitely
  1085. // on a Read call after reading the first error.
  1086. for attempt := 0; true; attempt++ {
  1087. if attempt != 0 {
  1088. if !sleep(ctx, backoff(attempt, r.backoffDelayMin, r.backoffDelayMax)) {
  1089. return
  1090. }
  1091. }
  1092. r.withLogger(func(log Logger) {
  1093. log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, toHumanOffset(offset))
  1094. })
  1095. conn, start, err := r.initialize(ctx, offset)
  1096. if err != nil {
  1097. if errors.Is(err, OffsetOutOfRange) {
  1098. if r.offsetOutOfRangeError {
  1099. r.sendError(ctx, err)
  1100. return
  1101. }
  1102. // This would happen if the requested offset is passed the last
  1103. // offset on the partition leader. In that case we're just going
  1104. // to retry later hoping that enough data has been produced.
  1105. r.withErrorLogger(func(log Logger) {
  1106. log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
  1107. })
  1108. continue
  1109. }
  1110. // Perform a configured number of attempts before
  1111. // reporting first errors, this helps mitigate
  1112. // situations where the kafka server is temporarily
  1113. // unavailable.
  1114. if attempt >= r.maxAttempts {
  1115. r.sendError(ctx, err)
  1116. } else {
  1117. r.stats.errors.observe(1)
  1118. r.withErrorLogger(func(log Logger) {
  1119. log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
  1120. })
  1121. }
  1122. continue
  1123. }
  1124. // Resetting the attempt counter ensures that if a failure occurs after
  1125. // a successful initialization we don't keep increasing the backoff
  1126. // timeout.
  1127. attempt = 0
  1128. // Now we're sure to have an absolute offset number, may anything happen
  1129. // to the connection we know we'll want to restart from this offset.
  1130. offset = start
  1131. errcount := 0
  1132. readLoop:
  1133. for {
  1134. if !sleep(ctx, backoff(errcount, r.backoffDelayMin, r.backoffDelayMax)) {
  1135. conn.Close()
  1136. return
  1137. }
  1138. offset, err = r.read(ctx, offset, conn)
  1139. switch {
  1140. case err == nil:
  1141. errcount = 0
  1142. continue
  1143. case errors.Is(err, io.EOF):
  1144. // done with this batch of messages...carry on. note that this
  1145. // block relies on the batch repackaging real io.EOF errors as
  1146. // io.UnexpectedEOF. otherwise, we would end up swallowing real
  1147. // errors here.
  1148. errcount = 0
  1149. continue
  1150. case errors.Is(err, io.ErrNoProgress):
  1151. // This error is returned by the Conn when it believes the connection
  1152. // has been corrupted, so we need to explicitly close it. Since we are
  1153. // explicitly handling it and a retry will pick up, we can suppress the
  1154. // error metrics and logs for this case.
  1155. conn.Close()
  1156. break readLoop
  1157. case errors.Is(err, UnknownTopicOrPartition):
  1158. r.withErrorLogger(func(log Logger) {
  1159. log.Printf("failed to read from current broker %v for partition %d of %s at offset %d: %v", r.brokers, r.partition, r.topic, toHumanOffset(offset), err)
  1160. })
  1161. conn.Close()
  1162. // The next call to .initialize will re-establish a connection to the proper
  1163. // topic/partition broker combo.
  1164. r.stats.rebalances.observe(1)
  1165. break readLoop
  1166. case errors.Is(err, NotLeaderForPartition):
  1167. r.withErrorLogger(func(log Logger) {
  1168. log.Printf("failed to read from current broker for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err)
  1169. })
  1170. conn.Close()
  1171. // The next call to .initialize will re-establish a connection to the proper
  1172. // partition leader.
  1173. r.stats.rebalances.observe(1)
  1174. break readLoop
  1175. case errors.Is(err, RequestTimedOut):
  1176. // Timeout on the kafka side, this can be safely retried.
  1177. errcount = 0
  1178. r.withLogger(func(log Logger) {
  1179. log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err)
  1180. })
  1181. r.stats.timeouts.observe(1)
  1182. continue
  1183. case errors.Is(err, OffsetOutOfRange):
  1184. first, last, err := r.readOffsets(conn)
  1185. if err != nil {
  1186. r.withErrorLogger(func(log Logger) {
  1187. log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err)
  1188. })
  1189. conn.Close()
  1190. break readLoop
  1191. }
  1192. switch {
  1193. case offset < first:
  1194. r.withErrorLogger(func(log Logger) {
  1195. log.Printf("the kafka reader is reading before the first offset for partition %d of %s, skipping from offset %d to %d (%d messages)", r.partition, r.topic, toHumanOffset(offset), first, first-offset)
  1196. })
  1197. offset, errcount = first, 0
  1198. continue // retry immediately so we don't keep falling behind due to the backoff
  1199. case offset < last:
  1200. errcount = 0
  1201. continue // more messages have already become available, retry immediately
  1202. default:
  1203. // We may be reading past the last offset, will retry later.
  1204. r.withErrorLogger(func(log Logger) {
  1205. log.Printf("the kafka reader is reading passed the last offset for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset))
  1206. })
  1207. }
  1208. case errors.Is(err, context.Canceled):
  1209. // Another reader has taken over, we can safely quit.
  1210. conn.Close()
  1211. return
  1212. case errors.Is(err, errUnknownCodec):
  1213. // The compression codec is either unsupported or has not been
  1214. // imported. This is a fatal error b/c the reader cannot
  1215. // proceed.
  1216. r.sendError(ctx, err)
  1217. break readLoop
  1218. default:
  1219. var kafkaError Error
  1220. if errors.As(err, &kafkaError) {
  1221. r.sendError(ctx, err)
  1222. } else {
  1223. r.withErrorLogger(func(log Logger) {
  1224. log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, toHumanOffset(offset), err)
  1225. })
  1226. r.stats.errors.observe(1)
  1227. conn.Close()
  1228. break readLoop
  1229. }
  1230. }
  1231. errcount++
  1232. }
  1233. }
  1234. }
  1235. func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) {
  1236. for i := 0; i != len(r.brokers) && conn == nil; i++ {
  1237. broker := r.brokers[i]
  1238. var first, last int64
  1239. t0 := time.Now()
  1240. conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition)
  1241. t1 := time.Now()
  1242. r.stats.dials.observe(1)
  1243. r.stats.dialTime.observeDuration(t1.Sub(t0))
  1244. if err != nil {
  1245. continue
  1246. }
  1247. if first, last, err = r.readOffsets(conn); err != nil {
  1248. conn.Close()
  1249. conn = nil
  1250. break
  1251. }
  1252. switch {
  1253. case offset == FirstOffset:
  1254. offset = first
  1255. case offset == LastOffset:
  1256. offset = last
  1257. case offset < first:
  1258. offset = first
  1259. }
  1260. r.withLogger(func(log Logger) {
  1261. log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, toHumanOffset(offset))
  1262. })
  1263. if start, err = conn.Seek(offset, SeekAbsolute); err != nil {
  1264. conn.Close()
  1265. conn = nil
  1266. break
  1267. }
  1268. conn.SetDeadline(time.Time{})
  1269. }
  1270. return
  1271. }
  1272. func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
  1273. r.stats.fetches.observe(1)
  1274. r.stats.offset.observe(offset)
  1275. t0 := time.Now()
  1276. conn.SetReadDeadline(t0.Add(r.maxWait))
  1277. batch := conn.ReadBatchWith(ReadBatchConfig{
  1278. MinBytes: r.minBytes,
  1279. MaxBytes: r.maxBytes,
  1280. IsolationLevel: r.isolationLevel,
  1281. })
  1282. highWaterMark := batch.HighWaterMark()
  1283. t1 := time.Now()
  1284. r.stats.waitTime.observeDuration(t1.Sub(t0))
  1285. var msg Message
  1286. var err error
  1287. var size int64
  1288. var bytes int64
  1289. for {
  1290. conn.SetReadDeadline(time.Now().Add(r.readBatchTimeout))
  1291. if msg, err = batch.ReadMessage(); err != nil {
  1292. batch.Close()
  1293. break
  1294. }
  1295. n := int64(len(msg.Key) + len(msg.Value))
  1296. r.stats.messages.observe(1)
  1297. r.stats.bytes.observe(n)
  1298. if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
  1299. batch.Close()
  1300. break
  1301. }
  1302. offset = msg.Offset + 1
  1303. r.stats.offset.observe(offset)
  1304. r.stats.lag.observe(highWaterMark - offset)
  1305. size++
  1306. bytes += n
  1307. }
  1308. conn.SetReadDeadline(time.Time{})
  1309. t2 := time.Now()
  1310. r.stats.readTime.observeDuration(t2.Sub(t1))
  1311. r.stats.fetchSize.observe(size)
  1312. r.stats.fetchBytes.observe(bytes)
  1313. return offset, err
  1314. }
  1315. func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) {
  1316. conn.SetDeadline(time.Now().Add(10 * time.Second))
  1317. return conn.ReadOffsets()
  1318. }
  1319. func (r *reader) sendMessage(ctx context.Context, msg Message, watermark int64) error {
  1320. select {
  1321. case r.msgs <- readerMessage{version: r.version, message: msg, watermark: watermark}:
  1322. return nil
  1323. case <-ctx.Done():
  1324. return ctx.Err()
  1325. }
  1326. }
  1327. func (r *reader) sendError(ctx context.Context, err error) error {
  1328. select {
  1329. case r.msgs <- readerMessage{version: r.version, error: err}:
  1330. return nil
  1331. case <-ctx.Done():
  1332. return ctx.Err()
  1333. }
  1334. }
  1335. func (r *reader) withLogger(do func(Logger)) {
  1336. if r.logger != nil {
  1337. do(r.logger)
  1338. }
  1339. }
  1340. func (r *reader) withErrorLogger(do func(Logger)) {
  1341. if r.errorLogger != nil {
  1342. do(r.errorLogger)
  1343. } else {
  1344. r.withLogger(do)
  1345. }
  1346. }
  1347. // extractTopics returns the unique list of topics represented by the set of
  1348. // provided members.
  1349. func extractTopics(members []GroupMember) []string {
  1350. visited := map[string]struct{}{}
  1351. var topics []string
  1352. for _, member := range members {
  1353. for _, topic := range member.Topics {
  1354. if _, seen := visited[topic]; seen {
  1355. continue
  1356. }
  1357. topics = append(topics, topic)
  1358. visited[topic] = struct{}{}
  1359. }
  1360. }
  1361. sort.Strings(topics)
  1362. return topics
  1363. }
  1364. type humanOffset int64
  1365. func toHumanOffset(v int64) humanOffset {
  1366. return humanOffset(v)
  1367. }
  1368. func (offset humanOffset) Format(w fmt.State, _ rune) {
  1369. v := int64(offset)
  1370. switch v {
  1371. case FirstOffset:
  1372. fmt.Fprint(w, "first offset")
  1373. case LastOffset:
  1374. fmt.Fprint(w, "last offset")
  1375. default:
  1376. fmt.Fprint(w, strconv.FormatInt(v, 10))
  1377. }
  1378. }