def pollForMetrics()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/InflightMetrics.scala [55:101]


  def pollForMetrics(
      interval: FiniteDuration,
      control: Control,
      consumerMetricNames: List[ConsumerMetricRequest],
      brokerMetricNames: List[BrokerMetricRequest],
      brokerJmxUrls: List[String])(implicit mat: Materializer): (Cancellable, Future[List[List[String]]]) = {
    implicit val ec: ExecutionContext = mat.executionContext

    val consumerMetricNamesSorted = consumerMetricNames.sortBy(_.name)
    val brokerMetricNamesSorted = brokerMetricNames.sortBy(_.name)

    val accStart = (0.seconds, None.asInstanceOf[Option[List[Metric]]])
    val brokersJmx = jmxConnections(brokerJmxUrls)

    val (metricsControl, metricsFuture) = Source
      .tick(0.seconds, interval, NotUsed)
      .scanAsync(accStart) {
        case ((timeMs, accLastMetrics), _) =>
          getAllMetrics(control, consumerMetricNamesSorted, brokerMetricNamesSorted, brokersJmx).map {
            case jvmMetrics :: consumerMetrics :: brokerMetrics :: Nil =>
              val timeMsMeasurement = Measurement(timeMsHeader, timeMs.toMillis.toDouble, GaugeMetricType)
              val newMetrics = timeMsMeasurement +: (jvmMetrics ++ consumerMetrics ++ brokerMetrics)
              val nextInterval = interval + timeMs
              val nextAcc = accLastMetrics match {
                case None              => InflightMetrics.reset(newMetrics, registry)
                case Some(lastMetrics) => InflightMetrics.update(lastMetrics, newMetrics)
              }
              (nextInterval, Some(nextAcc))
            case _ => throw new IllegalStateException("The wrong number of Future results were returned.")
          }
      }
      .mapConcat { case (_, results: Option[List[Metric]]) => results.toList }
      .toMat(Sink.seq)(Keep.both)
      .run()

    val metricsWithHeaderAndFooter: Future[List[List[String]]] = metricsFuture.map { metrics =>
      val header = timeMsHeader +: metricHeaders(consumerMetricNamesSorted, brokerMetricNamesSorted)
      val metricsStrings = metrics.map(_.map(_.value.toString)).toList
      val summaryLine = metrics.last.map {
        case hg: HistogramGauge => hg.summaryValue.toString
        case metric             => metric.value.toString
      }
      header +: metricsStrings :+ summaryLine
    }

    (metricsControl, metricsWithHeaderAndFooter)
  }