in cmd/queuebench/bench.go [111:125]
func createTopics(ctx context.Context, mngr *kafka.Manager, cfg kafka.TopicCreatorConfig, topics []apmqueue.Topic) error {
creator, err := mngr.NewTopicCreator(cfg)
if err != nil {
return fmt.Errorf("cannot instantiate topic creator: %w", err)
}
for _, topic := range topics {
err = creator.CreateTopics(ctx, topic)
if err != nil {
return fmt.Errorf("cannot create topics: %w", err)
}
}
return nil
}