in kafka/consumer.go [570:588]
func newPartitionConsumer(ctx context.Context,
client *kgo.Client,
processor apmqueue.Processor,
delivery apmqueue.DeliveryType,
topic string,
logger *zap.Logger,
) *pc {
c := pc{
topic: apmqueue.Topic(topic),
ctx: ctx,
client: client,
processor: processor,
delivery: delivery,
logger: logger,
}
// Only allow calls to processor.Process to happen serially.
c.g.SetLimit(1)
return &c
}