def consumePlainInflightMetrics()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala [79:107]


  def consumePlainInflightMetrics(fixture: NonCommittableFixture,
      meter: Meter,
      consumerMetricNames: List[ConsumerMetricRequest],
      brokerMetricNames: List[BrokerMetricRequest],
      brokerJmxUrls: List[String])(
      implicit mat: Materializer): List[List[String]] = {
    logger.debug("Creating and starting a stream")
    val (control, future) = fixture.source
      .take(fixture.msgCount.toLong)
      .map { msg =>
        meter.mark(); msg
      }
      .toMat(Sink.ignore)(Keep.both)
      .run()

    val (metricsControl, metricsFuture) =
      pollForMetrics(interval = 100.millis, control, consumerMetricNames, brokerMetricNames, brokerJmxUrls)

    implicit val ec: ExecutionContext = mat.executionContext
    future.onComplete { _ =>
      metricsControl.cancel()
    }

    Await.result(future, atMost = streamingTimeout)
    logger.debug("Stream finished")

    val inflightMetrics = Await.result(metricsFuture, atMost = streamingTimeout)
    inflightMetrics
  }