in systemtest/infra_kafka.go [160:177]
func CreateKafkaTopics(ctx context.Context, t testing.TB, partitions int, topics ...apmqueue.Topic) {
manager := NewKafkaManager(t)
topicCreator, err := manager.NewTopicCreator(kafka.TopicCreatorConfig{
PartitionCount: partitions,
TopicConfigs: map[string]string{
"retention.ms": strconv.FormatInt(time.Hour.Milliseconds(), 10),
},
})
require.NoError(t, err)
err = topicCreator.CreateTopics(ctx, topics...)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, manager.DeleteTopics(
context.Background(), topics...,
))
})
}