in kafka/consumer.go [515:550]
func (c *consumer) processFetch(fetches kgo.Fetches) {
if fetches.NumRecords() == 0 {
return
}
c.mu.RLock()
defer c.mu.RUnlock()
fetches.EachPartition(func(ftp kgo.FetchTopicPartition) {
if len(ftp.Records) == 0 {
return
}
consumer, ok := c.assignments[topicPartition{topic: ftp.Topic, partition: ftp.Partition}]
if ok {
consumer.consumeRecords(ftp)
return
}
// NOTE(marclop) While possible, this is unlikely to happen given the
// locking that's in place in the caller.
if c.delivery == apmqueue.AtMostOnceDeliveryType {
topicName := strings.TrimPrefix(ftp.Topic, c.topicPrefix)
logger := c.logger
if c.logFieldFn != nil {
logger = logger.With(c.logFieldFn(topicName))
}
logger.Warn(
"data loss: failed to send records to process after commit",
zap.Error(errors.New(
"attempted to process records for revoked partition",
)),
zap.String("topic", topicName),
zap.Int32("partition", ftp.Partition),
zap.Int64("offset", ftp.HighWatermark),
zap.Int("records", len(ftp.Records)),
)
}
})
}