in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala [166:188]
private def consumer[T](control: Control, requests: List[ConsumerMetricRequest])(
implicit ec: ExecutionContext): Future[List[Measurement]] = {
control.metrics.map { consumerMetrics =>
val metricValues = consumerMetrics
.filter { case (name, _) => requests.map(_.name).contains(name.name()) }
.filterNot { case (name, _) => name.tags.containsKey("topic") || name.tags.containsKey("partition") }
.toList
.sortBy { case (name, _) => name.name() }
.map { case (_, value) => value.metricValue().asInstanceOf[Any] }
.map(parseNumeric)
require(metricValues.size == requests.size,
"Number of returned metric values DNE number of requested consumer metrics")
val results: List[Measurement] = requests
.zip(metricValues)
.map {
case (ConsumerMetricRequest(name, metricType), value) => Measurement(name, value, metricType)
}
results
}
}