func()

in kafka/consumer.go [515:550]


func (c *consumer) processFetch(fetches kgo.Fetches) {
	if fetches.NumRecords() == 0 {
		return
	}
	c.mu.RLock()
	defer c.mu.RUnlock()
	fetches.EachPartition(func(ftp kgo.FetchTopicPartition) {
		if len(ftp.Records) == 0 {
			return
		}
		consumer, ok := c.assignments[topicPartition{topic: ftp.Topic, partition: ftp.Partition}]
		if ok {
			consumer.consumeRecords(ftp)
			return
		}
		// NOTE(marclop) While possible, this is unlikely to happen given the
		// locking that's in place in the caller.
		if c.delivery == apmqueue.AtMostOnceDeliveryType {
			topicName := strings.TrimPrefix(ftp.Topic, c.topicPrefix)
			logger := c.logger
			if c.logFieldFn != nil {
				logger = logger.With(c.logFieldFn(topicName))
			}
			logger.Warn(
				"data loss: failed to send records to process after commit",
				zap.Error(errors.New(
					"attempted to process records for revoked partition",
				)),
				zap.String("topic", topicName),
				zap.Int32("partition", ftp.Partition),
				zap.Int64("offset", ftp.HighWatermark),
				zap.Int("records", len(ftp.Records)),
			)
		}
	})
}