in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/Timed.scala [39:65]
def reporter(metricRegistry: MetricRegistry): ScheduledReporter =
Slf4jReporter
.forRegistry(metricRegistry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
def csvReporter(metricRegistry: MetricRegistry): ScheduledReporter =
CsvReporter
.forRegistry(metricRegistry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build(benchmarkReportBasePath.toFile)
def inflightMetricsReport(inflight: List[List[String]], testName: String)(
implicit mat: Materializer) = {
val metricsReportPath = benchmarkReportBasePath.resolve(Paths.get(s"$testName-inflight-metrics.csv"))
val metricsReportDetailPath = benchmarkReportBasePath.resolve(Paths.get(s"$testName-inflight-metrics-details.csv"))
require(inflight.size > 1, "At least 2 records (a header and a data row) are required to make a report.")
val summary = Source(List(inflight.head, inflight.last))
.via(format())
.alsoTo(Sink.foreach(bs => logger.info(bs.utf8String)))
.runWith(FileIO.toPath(metricsReportPath))
val details = Source(inflight).via(format()).runWith(FileIO.toPath(metricsReportDetailPath))
implicit val ec: ExecutionContext = mat.executionContext
Await.result(Future.sequence(List(summary, details)), 10.seconds)
}