in kafka/consumer.go [364:409]
func (c *Consumer) fetch(ctx context.Context) error {
fetches := c.client.PollRecords(ctx, c.cfg.MaxPollRecords)
defer c.client.AllowRebalance()
if fetches.IsClientClosed() ||
errors.Is(fetches.Err0(), context.Canceled) ||
errors.Is(fetches.Err0(), context.DeadlineExceeded) {
return context.Canceled
}
c.mu.RLock()
defer c.mu.RUnlock()
switch c.cfg.Delivery {
case apmqueue.AtLeastOnceDeliveryType:
// Committing the processed records happens on each partition consumer.
case apmqueue.AtMostOnceDeliveryType:
// Commit the fetched record offsets as soon as we've polled them.
if err := c.client.CommitUncommittedOffsets(ctx); err != nil {
logger := c.cfg.Logger
logger.Error("consumer commit offsets returned error", zap.Error(err))
// NOTE(marclop): If the commit fails with an unrecoverable error,
// return it and terminate the consumer. This will avoid potentially
// processing records twice, and it's up to the consumer to re-start
// the consumer.
return ErrCommitFailed
}
// Allow re-balancing now that we have committed offsets, preventing
// another consumer from reprocessing the records.
c.client.AllowRebalance()
}
fetches.EachError(func(t string, p int32, err error) {
topicName := strings.TrimPrefix(t, c.consumer.topicPrefix)
logger := c.cfg.Logger
if c.cfg.TopicLogFieldFunc != nil {
logger = logger.With(c.cfg.TopicLogFieldFunc(topicName))
}
logger.Error(
"consumer fetches returned error",
zap.Error(err),
zap.String("topic", topicName),
zap.Int32("partition", p),
)
})
c.consumer.processFetch(fetches)
return nil
}