cmd/queuebench/main.go (164 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package main import ( "context" "crypto/rand" "fmt" "log" "os" "os/signal" "syscall" "time" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/trace/noop" "go.uber.org/zap" apmqueue "github.com/elastic/apm-queue/v2" "github.com/elastic/apm-queue/v2/kafka" ) const namespace = "queuebench" func main() { // NOTE: intercept any panic and terminate gracefully // This allows using log.Panic in main which triggers // deferred functions (whereas log.Fatal don't). defer func() { if r := recover(); r != nil { log.Fatal(r) } }() cfg := config{} cfg.Parse() log.Printf("parsed config: %+v\n", cfg) log.Println("prep logger") var err error logger := logging(cfg.verbose) log.Println("prep MeterProvider") mp, rdr := metering() ctx := context.Background() ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) defer stop() run := time.Now().Unix() log.Printf("running bench run: %d", run) bench := bench{ Brokers: []string{cfg.broker}, ConsumerGroupID: fmt.Sprintf("queuebench-%d", run), Logger: logger, Partitions: cfg.partitions, TopicNamespace: namespace, Topics: []apmqueue.Topic{ apmqueue.Topic(fmt.Sprintf("run-%d", run)), }, mp: mp, tp: noop.NewTracerProvider(), } log.Println("running benchmark setup") if err = bench.Setup(ctx); err != nil { log.Panicf("benchmark setup failed: %s", err) } teardown := func() { log.Println("running benchmark teardown") // NOTE: using a different context to prevent this function // being affected by main context being closed if err := bench.Teardown(context.Background()); err != nil { log.Panicf("benchmark teardown failed: %s", err) } } defer teardown() start := time.Now() log.Println("==> running benchmark") log.Println("start consuming") var consumptionduration time.Duration go func() { defer func() { if r := recover(); r != nil { log.Panicf("consumer loop panicked: %s", r) } }() consumptionstart := time.Now() if err := bench.c.Run(ctx); err != nil { log.Printf("consumer run ended with an error: %s", err) } consumptionduration = time.Since(consumptionstart) }() log.Printf("start producing, will produce for %s", cfg.duration) productionstart := time.Now() if err := produce(ctx, bench.p, bench.Topics[0], cfg.eventSize, cfg.duration); err != nil { log.Printf("error while producing records: %s", err) } productionduration := time.Since(productionstart) log.Println("production ended") log.Println("waiting for consumer to fetch all records") ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() log.Printf("timeout set to: %s", time.Now().Add(cfg.timeout)) timer := time.NewTimer(cfg.timeout) defer timer.Stop() var rm metricdata.ResourceMetrics totalproduced := int64(0) totalconsumed := int64(0) wait: for { select { case <-ctx.Done(): log.Panic("context closed, terminating execution") case <-timer.C: log.Println("reached timeout, moving on") break wait case <-ticker.C: if err := rdr.Collect(ctx, &rm); err != nil { // NOTE: consider any error here as transient and don't trigger termination log.Printf("cannot collect otel metrics: %s", err) continue } totalproduced = getSumInt64Metric("github.com/elastic/apm-queue/kafka", "producer.messages.produced", rm) totalconsumed = getSumInt64Metric("github.com/elastic/apm-queue/kafka", "consumer.messages.fetched", rm) if totalconsumed >= totalproduced { log.Println("consumption ended") break wait } } } if err := bench.c.Close(); err != nil { log.Panicf("error closing consumer: %s", err) } log.Println("==> benchmark ") if err := bench.p.Close(); err != nil { log.Panicf("error closing producer: %s", err) } duration := time.Since(start) log.Printf("it took %s (-duration=%s)", duration, cfg.duration) log.Printf("time spent producing: %s", productionduration) log.Printf("time spent consuming: %s", consumptionduration) log.Printf("total produced/consumed: %d/%d", totalproduced, totalconsumed) log.Println("collecting metrics") rdr.Collect(context.Background(), &rm) if err = display(rm); err != nil { log.Panicf("failed displaying metrics: %s", err) } if totalproduced != totalconsumed { log.Panicf("total produced and consumed don't match: %d vs %d", totalproduced, totalconsumed) } log.Println("bench run completed successfully") } func produce(ctx context.Context, p *kafka.Producer, topic apmqueue.Topic, size int, duration time.Duration) error { buf := make([]byte, size) if _, err := rand.Read(buf); err != nil { return fmt.Errorf("cannot read random bytes: %w", err) } record := apmqueue.Record{ Topic: topic, Value: buf, } deadline := time.Now().Add(duration) for time.Now().Before(deadline) { if err := p.Produce(ctx, record); err != nil { return err } } return nil } func logging(verbose bool) *zap.Logger { var logger *zap.Logger var err error if verbose { logger, err = zap.NewDevelopment() if err != nil { log.Fatalf("cannot create zap logger: %s", err) } } else { logger = zap.NewNop() } return logger }