in cmd/queuebench/main.go [40:184]
func main() {
// NOTE: intercept any panic and terminate gracefully
// This allows using log.Panic in main which triggers
// deferred functions (whereas log.Fatal don't).
defer func() {
if r := recover(); r != nil {
log.Fatal(r)
}
}()
cfg := config{}
cfg.Parse()
log.Printf("parsed config: %+v\n", cfg)
log.Println("prep logger")
var err error
logger := logging(cfg.verbose)
log.Println("prep MeterProvider")
mp, rdr := metering()
ctx := context.Background()
ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer stop()
run := time.Now().Unix()
log.Printf("running bench run: %d", run)
bench := bench{
Brokers: []string{cfg.broker},
ConsumerGroupID: fmt.Sprintf("queuebench-%d", run),
Logger: logger,
Partitions: cfg.partitions,
TopicNamespace: namespace,
Topics: []apmqueue.Topic{
apmqueue.Topic(fmt.Sprintf("run-%d", run)),
},
mp: mp,
tp: noop.NewTracerProvider(),
}
log.Println("running benchmark setup")
if err = bench.Setup(ctx); err != nil {
log.Panicf("benchmark setup failed: %s", err)
}
teardown := func() {
log.Println("running benchmark teardown")
// NOTE: using a different context to prevent this function
// being affected by main context being closed
if err := bench.Teardown(context.Background()); err != nil {
log.Panicf("benchmark teardown failed: %s", err)
}
}
defer teardown()
start := time.Now()
log.Println("==> running benchmark")
log.Println("start consuming")
var consumptionduration time.Duration
go func() {
defer func() {
if r := recover(); r != nil {
log.Panicf("consumer loop panicked: %s", r)
}
}()
consumptionstart := time.Now()
if err := bench.c.Run(ctx); err != nil {
log.Printf("consumer run ended with an error: %s", err)
}
consumptionduration = time.Since(consumptionstart)
}()
log.Printf("start producing, will produce for %s", cfg.duration)
productionstart := time.Now()
if err := produce(ctx, bench.p, bench.Topics[0], cfg.eventSize, cfg.duration); err != nil {
log.Printf("error while producing records: %s", err)
}
productionduration := time.Since(productionstart)
log.Println("production ended")
log.Println("waiting for consumer to fetch all records")
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
log.Printf("timeout set to: %s", time.Now().Add(cfg.timeout))
timer := time.NewTimer(cfg.timeout)
defer timer.Stop()
var rm metricdata.ResourceMetrics
totalproduced := int64(0)
totalconsumed := int64(0)
wait:
for {
select {
case <-ctx.Done():
log.Panic("context closed, terminating execution")
case <-timer.C:
log.Println("reached timeout, moving on")
break wait
case <-ticker.C:
if err := rdr.Collect(ctx, &rm); err != nil {
// NOTE: consider any error here as transient and don't trigger termination
log.Printf("cannot collect otel metrics: %s", err)
continue
}
totalproduced = getSumInt64Metric("github.com/elastic/apm-queue/kafka", "producer.messages.produced", rm)
totalconsumed = getSumInt64Metric("github.com/elastic/apm-queue/kafka", "consumer.messages.fetched", rm)
if totalconsumed >= totalproduced {
log.Println("consumption ended")
break wait
}
}
}
if err := bench.c.Close(); err != nil {
log.Panicf("error closing consumer: %s", err)
}
log.Println("==> benchmark ")
if err := bench.p.Close(); err != nil {
log.Panicf("error closing producer: %s", err)
}
duration := time.Since(start)
log.Printf("it took %s (-duration=%s)", duration, cfg.duration)
log.Printf("time spent producing: %s", productionduration)
log.Printf("time spent consuming: %s", consumptionduration)
log.Printf("total produced/consumed: %d/%d", totalproduced, totalconsumed)
log.Println("collecting metrics")
rdr.Collect(context.Background(), &rm)
if err = display(rm); err != nil {
log.Panicf("failed displaying metrics: %s", err)
}
if totalproduced != totalconsumed {
log.Panicf("total produced and consumed don't match: %d vs %d", totalproduced, totalconsumed)
}
log.Println("bench run completed successfully")
}