def reporter()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala [39:65]


  def reporter(metricRegistry: MetricRegistry): ScheduledReporter =
    Slf4jReporter
      .forRegistry(metricRegistry)
      .convertRatesTo(TimeUnit.SECONDS)
      .convertDurationsTo(TimeUnit.MILLISECONDS)
      .build()

  def csvReporter(metricRegistry: MetricRegistry): ScheduledReporter =
    CsvReporter
      .forRegistry(metricRegistry)
      .convertRatesTo(TimeUnit.SECONDS)
      .convertDurationsTo(TimeUnit.MILLISECONDS)
      .build(benchmarkReportBasePath.toFile)

  def inflightMetricsReport(inflight: List[List[String]], testName: String)(
      implicit mat: Materializer) = {
    val metricsReportPath = benchmarkReportBasePath.resolve(Paths.get(s"$testName-inflight-metrics.csv"))
    val metricsReportDetailPath = benchmarkReportBasePath.resolve(Paths.get(s"$testName-inflight-metrics-details.csv"))
    require(inflight.size > 1, "At least 2 records (a header and a data row) are required to make a report.")
    val summary = Source(List(inflight.head, inflight.last))
      .via(format())
      .alsoTo(Sink.foreach(bs => logger.info(bs.utf8String)))
      .runWith(FileIO.toPath(metricsReportPath))
    val details = Source(inflight).via(format()).runWith(FileIO.toPath(metricsReportDetailPath))
    implicit val ec: ExecutionContext = mat.executionContext
    Await.result(Future.sequence(List(summary, details)), 10.seconds)
  }