func()

in kafka/consumer.go [592:642]


func (c *pc) consumeRecords(ftp kgo.FetchTopicPartition) {
	c.g.Go(func() error {
		// Stores the last processed record. Default to -1 for cases where
		// only the first record is received.
		last := -1
		for i, msg := range ftp.Records {
			meta := make(map[string]string, len(msg.Headers))
			for _, h := range msg.Headers {
				meta[h.Key] = string(h.Value)
			}

			processCtx := queuecontext.WithMetadata(msg.Context, meta)
			record := apmqueue.Record{
				Topic:       c.topic,
				Partition:   msg.Partition,
				OrderingKey: msg.Key,
				Value:       msg.Value,
			}
			// If a record can't be processed, no retries are attempted and it
			// may be lost. https://github.com/elastic/apm-queue/issues/118.
			if err := c.processor.Process(processCtx, record); err != nil {
				c.logger.Error("data loss: unable to process event",
					zap.Error(err),
					zap.Int64("offset", msg.Offset),
					zap.Any("headers", meta),
				)
				switch c.delivery {
				case apmqueue.AtLeastOnceDeliveryType:
					continue
				}
			}
			last = i
		}
		// Commit the last record offset when one or more records are processed
		// and the delivery guarantee is set to AtLeastOnceDeliveryType.
		if c.delivery == apmqueue.AtLeastOnceDeliveryType && last >= 0 {
			lastRecord := ftp.Records[last]
			if err := c.client.CommitRecords(c.ctx, lastRecord); err != nil {
				c.logger.Error("unable to commit records",
					zap.Error(err),
					zap.Int64("offset", lastRecord.Offset),
				)
			} else if len(ftp.Records) > 0 {
				c.logger.Info("committed",
					zap.Int64("offset", lastRecord.Offset),
				)
			}
		}
		return nil
	})
}