in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaConsumerBenchmarks.scala [79:107]
def consumePlainInflightMetrics(fixture: NonCommittableFixture,
meter: Meter,
consumerMetricNames: List[ConsumerMetricRequest],
brokerMetricNames: List[BrokerMetricRequest],
brokerJmxUrls: List[String])(
implicit mat: Materializer): List[List[String]] = {
logger.debug("Creating and starting a stream")
val (control, future) = fixture.source
.take(fixture.msgCount.toLong)
.map { msg =>
meter.mark(); msg
}
.toMat(Sink.ignore)(Keep.both)
.run()
val (metricsControl, metricsFuture) =
pollForMetrics(interval = 100.millis, control, consumerMetricNames, brokerMetricNames, brokerJmxUrls)
implicit val ec: ExecutionContext = mat.executionContext
future.onComplete { _ =>
metricsControl.cancel()
}
Await.result(future, atMost = streamingTimeout)
logger.debug("Stream finished")
val inflightMetrics = Await.result(metricsFuture, atMost = streamingTimeout)
inflightMetrics
}