func newPartitionConsumer()

in kafka/consumer.go [570:588]


func newPartitionConsumer(ctx context.Context,
	client *kgo.Client,
	processor apmqueue.Processor,
	delivery apmqueue.DeliveryType,
	topic string,
	logger *zap.Logger,
) *pc {
	c := pc{
		topic:     apmqueue.Topic(topic),
		ctx:       ctx,
		client:    client,
		processor: processor,
		delivery:  delivery,
		logger:    logger,
	}
	// Only allow calls to processor.Process to happen serially.
	c.g.SetLimit(1)
	return &c
}