in systemtest/providers.go [87:112]
func kafkaTypes(t testing.TB, opts ...option) (apmqueue.Producer, apmqueue.Consumer) {
ctx, cancel := context.WithTimeout(context.Background(), defaultProvisionTimeout)
defer cancel()
cfg := defaultCfg
for _, opt := range opts {
opt(&cfg)
}
logger := cfg.loggerF(t)
topics := cfg.topicsF(t)
CreateKafkaTopics(ctx, t, cfg.partitions, topics...)
producer := newKafkaProducer(t, kafka.ProducerConfig{
CommonConfig: kafka.CommonConfig{Logger: logger.Named("producer")},
Sync: cfg.sync,
})
consumer := newKafkaConsumer(t, kafka.ConsumerConfig{
CommonConfig: kafka.CommonConfig{Logger: logger.Named("consumer")},
Topics: topics,
GroupID: t.Name(),
Processor: cfg.processor,
Delivery: cfg.dt,
})
return producer, consumer
}