in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala [55:101]
def pollForMetrics(
interval: FiniteDuration,
control: Control,
consumerMetricNames: List[ConsumerMetricRequest],
brokerMetricNames: List[BrokerMetricRequest],
brokerJmxUrls: List[String])(implicit mat: Materializer): (Cancellable, Future[List[List[String]]]) = {
implicit val ec: ExecutionContext = mat.executionContext
val consumerMetricNamesSorted = consumerMetricNames.sortBy(_.name)
val brokerMetricNamesSorted = brokerMetricNames.sortBy(_.name)
val accStart = (0.seconds, None.asInstanceOf[Option[List[Metric]]])
val brokersJmx = jmxConnections(brokerJmxUrls)
val (metricsControl, metricsFuture) = Source
.tick(0.seconds, interval, NotUsed)
.scanAsync(accStart) {
case ((timeMs, accLastMetrics), _) =>
getAllMetrics(control, consumerMetricNamesSorted, brokerMetricNamesSorted, brokersJmx).map {
case jvmMetrics :: consumerMetrics :: brokerMetrics :: Nil =>
val timeMsMeasurement = Measurement(timeMsHeader, timeMs.toMillis.toDouble, GaugeMetricType)
val newMetrics = timeMsMeasurement +: (jvmMetrics ++ consumerMetrics ++ brokerMetrics)
val nextInterval = interval + timeMs
val nextAcc = accLastMetrics match {
case None => InflightMetrics.reset(newMetrics, registry)
case Some(lastMetrics) => InflightMetrics.update(lastMetrics, newMetrics)
}
(nextInterval, Some(nextAcc))
case _ => throw new IllegalStateException("The wrong number of Future results were returned.")
}
}
.mapConcat { case (_, results: Option[List[Metric]]) => results.toList }
.toMat(Sink.seq)(Keep.both)
.run()
val metricsWithHeaderAndFooter: Future[List[List[String]]] = metricsFuture.map { metrics =>
val header = timeMsHeader +: metricHeaders(consumerMetricNamesSorted, brokerMetricNamesSorted)
val metricsStrings = metrics.map(_.map(_.value.toString)).toList
val summaryLine = metrics.last.map {
case hg: HistogramGauge => hg.summaryValue.toString
case metric => metric.value.toString
}
header +: metricsStrings :+ summaryLine
}
(metricsControl, metricsWithHeaderAndFooter)
}