func()

in cmd/queuebench/bench.go [49:100]


func (b *bench) Setup(ctx context.Context) error {
	kafkaCommonCfg := kafka.CommonConfig{
		Brokers:   b.Brokers,
		Namespace: b.TopicNamespace,

		Logger:         b.Logger,
		TracerProvider: b.tp,
		MeterProvider:  b.mp,
	}

	mngrCfg := kafka.ManagerConfig{
		CommonConfig: kafkaCommonCfg,
	}
	mngrCfg.CommonConfig.ClientID = "queuebench-manager"
	mngrCfg.CommonConfig.Logger = b.Logger.With(zap.String("role", "manager"))
	mngr, err := kafka.NewManager(mngrCfg)
	if err != nil {
		return fmt.Errorf("cannot create kafka manager: %w", err)
	}

	b.m = mngr

	if err = b.m.Healthy(ctx); err != nil {
		return fmt.Errorf("cluster health check failed: %w", err)
	}

	log.Println("cluster confirmed healthy")

	log.Printf("creating kafka topics: %v", b.Topics)
	topicsCfg := kafka.TopicCreatorConfig{
		PartitionCount: b.Partitions,
	}
	if err = createTopics(ctx, b.m, topicsCfg, b.Topics); err != nil {
		return fmt.Errorf("cannot create topics: %w", err)
	}

	consumer, err := createConsumer(kafkaCommonCfg, b.Topics, b.ConsumerGroupID)
	if err != nil {
		return fmt.Errorf("cannot create consumer: %w", err)
	}

	b.c = consumer

	producer, err := createProducer(kafkaCommonCfg)
	if err != nil {
		return fmt.Errorf("cannot create producer: %w", err)
	}

	b.p = producer

	return nil
}