in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionBenchmarks.scala [39:73]
def consumeTransformProduceTransaction(fixture: TransactionFixture,
meter: Meter)(implicit mat: Materializer): Unit = {
logger.debug("Creating and starting a stream")
val msgCount = fixture.msgCount
val sinkTopic = fixture.sinkTopic
val source = fixture.source
val promise = Promise[Unit]()
val logPercentStep = 1
val loggedStep = if (msgCount > logPercentStep) 100 else 1
val control = source
.map { msg =>
ProducerMessage.single(new ProducerRecord[Array[Byte], String](sinkTopic, msg.record.value()),
msg.partitionOffset)
}
.via(fixture.flow)
.toMat(Sink.foreach {
case result: Result[Key, Val, PassThrough] =>
meter.mark()
val offset = result.offset
if (result.offset % loggedStep == 0)
logger.info(s"Transformed $offset elements to Kafka (${100 * offset / msgCount}%)")
if (result.offset >= fixture.msgCount - 1)
promise.complete(Success(()))
case other: Results[Key, Val, PassThrough] =>
meter.mark()
promise.complete(Success(()))
})(Keep.left)
.run()
Await.result(promise.future, streamingTimeout)
control.shutdown()
logger.debug("Stream finished")
}