cmd/queuebench/bench.go (115 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package main
import (
"context"
"fmt"
"log"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
apmqueue "github.com/elastic/apm-queue/v2"
"github.com/elastic/apm-queue/v2/kafka"
)
type bench struct {
Brokers []string
ConsumerGroupID string
Logger *zap.Logger
Partitions int
TopicNamespace string
Topics []apmqueue.Topic
mp metric.MeterProvider
tp trace.TracerProvider
c *kafka.Consumer
m *kafka.Manager
p *kafka.Producer
}
func (b *bench) Setup(ctx context.Context) error {
kafkaCommonCfg := kafka.CommonConfig{
Brokers: b.Brokers,
Namespace: b.TopicNamespace,
Logger: b.Logger,
TracerProvider: b.tp,
MeterProvider: b.mp,
}
mngrCfg := kafka.ManagerConfig{
CommonConfig: kafkaCommonCfg,
}
mngrCfg.CommonConfig.ClientID = "queuebench-manager"
mngrCfg.CommonConfig.Logger = b.Logger.With(zap.String("role", "manager"))
mngr, err := kafka.NewManager(mngrCfg)
if err != nil {
return fmt.Errorf("cannot create kafka manager: %w", err)
}
b.m = mngr
if err = b.m.Healthy(ctx); err != nil {
return fmt.Errorf("cluster health check failed: %w", err)
}
log.Println("cluster confirmed healthy")
log.Printf("creating kafka topics: %v", b.Topics)
topicsCfg := kafka.TopicCreatorConfig{
PartitionCount: b.Partitions,
}
if err = createTopics(ctx, b.m, topicsCfg, b.Topics); err != nil {
return fmt.Errorf("cannot create topics: %w", err)
}
consumer, err := createConsumer(kafkaCommonCfg, b.Topics, b.ConsumerGroupID)
if err != nil {
return fmt.Errorf("cannot create consumer: %w", err)
}
b.c = consumer
producer, err := createProducer(kafkaCommonCfg)
if err != nil {
return fmt.Errorf("cannot create producer: %w", err)
}
b.p = producer
return nil
}
func (b *bench) Teardown(ctx context.Context) error {
log.Printf("deleting benchmark kafka topics: %v", b.Topics)
if err := deleteTopics(ctx, b.m, b.Topics); err != nil {
return fmt.Errorf("teardown not completed: %w", err)
}
return nil
}
func createTopics(ctx context.Context, mngr *kafka.Manager, cfg kafka.TopicCreatorConfig, topics []apmqueue.Topic) error {
creator, err := mngr.NewTopicCreator(cfg)
if err != nil {
return fmt.Errorf("cannot instantiate topic creator: %w", err)
}
for _, topic := range topics {
err = creator.CreateTopics(ctx, topic)
if err != nil {
return fmt.Errorf("cannot create topics: %w", err)
}
}
return nil
}
func deleteTopics(ctx context.Context, mngr *kafka.Manager, topics []apmqueue.Topic) error {
err := mngr.DeleteTopics(ctx, topics...)
if err != nil {
return fmt.Errorf("cannot delete topics: %w", err)
}
return nil
}
type dummyProcessor struct{}
func (d dummyProcessor) Process(context.Context, apmqueue.Record) error {
return nil
}
func createConsumer(commonCfg kafka.CommonConfig, topics []apmqueue.Topic, groupID string) (*kafka.Consumer, error) {
cfg := kafka.ConsumerConfig{
CommonConfig: commonCfg,
GroupID: groupID,
Processor: dummyProcessor{},
Topics: topics,
}
cfg.CommonConfig.ClientID = "queuebench-consumer"
cfg.CommonConfig.Logger = commonCfg.Logger.With(zap.String("role", "consumer"))
return kafka.NewConsumer(cfg)
}
func createProducer(commonCfg kafka.CommonConfig) (*kafka.Producer, error) {
cfg := kafka.ProducerConfig{
CommonConfig: commonCfg,
}
cfg.CommonConfig.ClientID = "queuebench-producer"
cfg.CommonConfig.Logger = commonCfg.Logger.With(zap.String("role", "producer"))
return kafka.NewProducer(cfg)
}