func CreateKafkaTopics()

in systemtest/infra_kafka.go [160:177]


func CreateKafkaTopics(ctx context.Context, t testing.TB, partitions int, topics ...apmqueue.Topic) {
	manager := NewKafkaManager(t)
	topicCreator, err := manager.NewTopicCreator(kafka.TopicCreatorConfig{
		PartitionCount: partitions,
		TopicConfigs: map[string]string{
			"retention.ms": strconv.FormatInt(time.Hour.Milliseconds(), 10),
		},
	})
	require.NoError(t, err)

	err = topicCreator.CreateTopics(ctx, topics...)
	require.NoError(t, err)
	t.Cleanup(func() {
		require.NoError(t, manager.DeleteTopics(
			context.Background(), topics...,
		))
	})
}