in kafka/consumer.go [592:642]
func (c *pc) consumeRecords(ftp kgo.FetchTopicPartition) {
c.g.Go(func() error {
// Stores the last processed record. Default to -1 for cases where
// only the first record is received.
last := -1
for i, msg := range ftp.Records {
meta := make(map[string]string, len(msg.Headers))
for _, h := range msg.Headers {
meta[h.Key] = string(h.Value)
}
processCtx := queuecontext.WithMetadata(msg.Context, meta)
record := apmqueue.Record{
Topic: c.topic,
Partition: msg.Partition,
OrderingKey: msg.Key,
Value: msg.Value,
}
// If a record can't be processed, no retries are attempted and it
// may be lost. https://github.com/elastic/apm-queue/issues/118.
if err := c.processor.Process(processCtx, record); err != nil {
c.logger.Error("data loss: unable to process event",
zap.Error(err),
zap.Int64("offset", msg.Offset),
zap.Any("headers", meta),
)
switch c.delivery {
case apmqueue.AtLeastOnceDeliveryType:
continue
}
}
last = i
}
// Commit the last record offset when one or more records are processed
// and the delivery guarantee is set to AtLeastOnceDeliveryType.
if c.delivery == apmqueue.AtLeastOnceDeliveryType && last >= 0 {
lastRecord := ftp.Records[last]
if err := c.client.CommitRecords(c.ctx, lastRecord); err != nil {
c.logger.Error("unable to commit records",
zap.Error(err),
zap.Int64("offset", lastRecord.Offset),
)
} else if len(ftp.Records) > 0 {
c.logger.Info("committed",
zap.Int64("offset", lastRecord.Offset),
)
}
}
return nil
})
}