func main()

in cmd/queuebench/main.go [40:184]


func main() {
	// NOTE: intercept any panic and terminate gracefully
	// This allows using log.Panic in main which triggers
	// deferred functions (whereas log.Fatal don't).
	defer func() {
		if r := recover(); r != nil {
			log.Fatal(r)
		}
	}()

	cfg := config{}
	cfg.Parse()
	log.Printf("parsed config: %+v\n", cfg)

	log.Println("prep logger")
	var err error
	logger := logging(cfg.verbose)

	log.Println("prep MeterProvider")
	mp, rdr := metering()

	ctx := context.Background()
	ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
	defer stop()

	run := time.Now().Unix()
	log.Printf("running bench run: %d", run)

	bench := bench{
		Brokers:         []string{cfg.broker},
		ConsumerGroupID: fmt.Sprintf("queuebench-%d", run),
		Logger:          logger,
		Partitions:      cfg.partitions,
		TopicNamespace:  namespace,
		Topics: []apmqueue.Topic{
			apmqueue.Topic(fmt.Sprintf("run-%d", run)),
		},

		mp: mp,
		tp: noop.NewTracerProvider(),
	}

	log.Println("running benchmark setup")
	if err = bench.Setup(ctx); err != nil {
		log.Panicf("benchmark setup failed: %s", err)
	}
	teardown := func() {
		log.Println("running benchmark teardown")
		// NOTE: using a different context to prevent this function
		// being affected by main context being closed
		if err := bench.Teardown(context.Background()); err != nil {
			log.Panicf("benchmark teardown failed: %s", err)
		}

	}
	defer teardown()

	start := time.Now()
	log.Println("==> running benchmark")

	log.Println("start consuming")
	var consumptionduration time.Duration
	go func() {
		defer func() {
			if r := recover(); r != nil {
				log.Panicf("consumer loop panicked: %s", r)
			}
		}()

		consumptionstart := time.Now()
		if err := bench.c.Run(ctx); err != nil {
			log.Printf("consumer run ended with an error: %s", err)
		}
		consumptionduration = time.Since(consumptionstart)
	}()

	log.Printf("start producing, will produce for %s", cfg.duration)
	productionstart := time.Now()
	if err := produce(ctx, bench.p, bench.Topics[0], cfg.eventSize, cfg.duration); err != nil {
		log.Printf("error while producing records: %s", err)
	}
	productionduration := time.Since(productionstart)

	log.Println("production ended")

	log.Println("waiting for consumer to fetch all records")
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()
	log.Printf("timeout set to: %s", time.Now().Add(cfg.timeout))
	timer := time.NewTimer(cfg.timeout)
	defer timer.Stop()

	var rm metricdata.ResourceMetrics
	totalproduced := int64(0)
	totalconsumed := int64(0)
wait:
	for {
		select {
		case <-ctx.Done():
			log.Panic("context closed, terminating execution")
		case <-timer.C:
			log.Println("reached timeout, moving on")
			break wait
		case <-ticker.C:
			if err := rdr.Collect(ctx, &rm); err != nil {
				// NOTE: consider any error here as transient and don't trigger termination
				log.Printf("cannot collect otel metrics: %s", err)
				continue
			}

			totalproduced = getSumInt64Metric("github.com/elastic/apm-queue/kafka", "producer.messages.produced", rm)
			totalconsumed = getSumInt64Metric("github.com/elastic/apm-queue/kafka", "consumer.messages.fetched", rm)
			if totalconsumed >= totalproduced {
				log.Println("consumption ended")
				break wait
			}
		}
	}
	if err := bench.c.Close(); err != nil {
		log.Panicf("error closing consumer: %s", err)
	}

	log.Println("==> benchmark ")
	if err := bench.p.Close(); err != nil {
		log.Panicf("error closing producer: %s", err)
	}

	duration := time.Since(start)
	log.Printf("it took %s (-duration=%s)", duration, cfg.duration)
	log.Printf("time spent producing: %s", productionduration)
	log.Printf("time spent consuming: %s", consumptionduration)
	log.Printf("total produced/consumed: %d/%d", totalproduced, totalconsumed)

	log.Println("collecting metrics")
	rdr.Collect(context.Background(), &rm)
	if err = display(rm); err != nil {
		log.Panicf("failed displaying metrics: %s", err)
	}

	if totalproduced != totalconsumed {
		log.Panicf("total produced and consumed don't match: %d vs %d", totalproduced, totalconsumed)
	}

	log.Println("bench run completed successfully")
}