func()

in kafka/consumer.go [364:409]


func (c *Consumer) fetch(ctx context.Context) error {
	fetches := c.client.PollRecords(ctx, c.cfg.MaxPollRecords)
	defer c.client.AllowRebalance()

	if fetches.IsClientClosed() ||
		errors.Is(fetches.Err0(), context.Canceled) ||
		errors.Is(fetches.Err0(), context.DeadlineExceeded) {
		return context.Canceled
	}
	c.mu.RLock()
	defer c.mu.RUnlock()
	switch c.cfg.Delivery {
	case apmqueue.AtLeastOnceDeliveryType:
		// Committing the processed records happens on each partition consumer.
	case apmqueue.AtMostOnceDeliveryType:
		// Commit the fetched record offsets as soon as we've polled them.
		if err := c.client.CommitUncommittedOffsets(ctx); err != nil {
			logger := c.cfg.Logger
			logger.Error("consumer commit offsets returned error", zap.Error(err))
			// NOTE(marclop): If the commit fails with an unrecoverable error,
			// return it and terminate the consumer. This will avoid potentially
			// processing records twice, and it's up to the consumer to re-start
			// the consumer.
			return ErrCommitFailed
		}
		// Allow re-balancing now that we have committed offsets, preventing
		// another consumer from reprocessing the records.
		c.client.AllowRebalance()
	}
	fetches.EachError(func(t string, p int32, err error) {
		topicName := strings.TrimPrefix(t, c.consumer.topicPrefix)
		logger := c.cfg.Logger
		if c.cfg.TopicLogFieldFunc != nil {
			logger = logger.With(c.cfg.TopicLogFieldFunc(topicName))
		}

		logger.Error(
			"consumer fetches returned error",
			zap.Error(err),
			zap.String("topic", topicName),
			zap.Int32("partition", p),
		)
	})
	c.consumer.processFetch(fetches)
	return nil
}