in cmd/queuebench/bench.go [49:100]
func (b *bench) Setup(ctx context.Context) error {
kafkaCommonCfg := kafka.CommonConfig{
Brokers: b.Brokers,
Namespace: b.TopicNamespace,
Logger: b.Logger,
TracerProvider: b.tp,
MeterProvider: b.mp,
}
mngrCfg := kafka.ManagerConfig{
CommonConfig: kafkaCommonCfg,
}
mngrCfg.CommonConfig.ClientID = "queuebench-manager"
mngrCfg.CommonConfig.Logger = b.Logger.With(zap.String("role", "manager"))
mngr, err := kafka.NewManager(mngrCfg)
if err != nil {
return fmt.Errorf("cannot create kafka manager: %w", err)
}
b.m = mngr
if err = b.m.Healthy(ctx); err != nil {
return fmt.Errorf("cluster health check failed: %w", err)
}
log.Println("cluster confirmed healthy")
log.Printf("creating kafka topics: %v", b.Topics)
topicsCfg := kafka.TopicCreatorConfig{
PartitionCount: b.Partitions,
}
if err = createTopics(ctx, b.m, topicsCfg, b.Topics); err != nil {
return fmt.Errorf("cannot create topics: %w", err)
}
consumer, err := createConsumer(kafkaCommonCfg, b.Topics, b.ConsumerGroupID)
if err != nil {
return fmt.Errorf("cannot create consumer: %w", err)
}
b.c = consumer
producer, err := createProducer(kafkaCommonCfg)
if err != nil {
return fmt.Errorf("cannot create producer: %w", err)
}
b.p = producer
return nil
}