in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaProducerBenchmarks.scala [41:71]
def plainFlow(fixture: ReactiveKafkaProducerTestFixture[Int], meter: Meter)(implicit mat: Materializer): Unit = {
logger.debug("Creating and starting a stream")
@volatile var lastPartStart = System.nanoTime()
val msg = PerfFixtureHelpers.stringOfSize(fixture.msgSize)
val future = Source(0 to fixture.msgCount)
.map { number =>
val partition: Int = (number % fixture.numberOfPartitions).toInt
ProducerMessage.single(new ProducerRecord[Array[Byte], String](fixture.topic, partition, null, msg), number)
}
.via(fixture.flow)
.map {
case msg: Result[Array[Byte], String, Int] =>
meter.mark()
if (msg.offset % logStep == 0) {
val lastPartEnd = System.nanoTime()
val took = (lastPartEnd - lastPartStart).nanos
logger.info(s"Sent ${msg.offset}, took ${took.toMillis} ms to send last $logStep")
lastPartStart = lastPartEnd
}
msg
case other: Results[Array[Byte], String, Int] =>
meter.mark()
other
}
.runWith(Sink.ignore)
Await.result(future, atMost = streamingTimeout)
logger.info("Stream finished")
}