func kafkaTypes()

in systemtest/providers.go [87:112]


func kafkaTypes(t testing.TB, opts ...option) (apmqueue.Producer, apmqueue.Consumer) {
	ctx, cancel := context.WithTimeout(context.Background(), defaultProvisionTimeout)
	defer cancel()

	cfg := defaultCfg
	for _, opt := range opts {
		opt(&cfg)
	}

	logger := cfg.loggerF(t)
	topics := cfg.topicsF(t)
	CreateKafkaTopics(ctx, t, cfg.partitions, topics...)

	producer := newKafkaProducer(t, kafka.ProducerConfig{
		CommonConfig: kafka.CommonConfig{Logger: logger.Named("producer")},
		Sync:         cfg.sync,
	})
	consumer := newKafkaConsumer(t, kafka.ConsumerConfig{
		CommonConfig: kafka.CommonConfig{Logger: logger.Named("consumer")},
		Topics:       topics,
		GroupID:      t.Name(),
		Processor:    cfg.processor,
		Delivery:     cfg.dt,
	})
	return producer, consumer
}