in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala [117:147]
def run(fixture: Fixture, meter: Meter)(implicit mat: Materializer): Unit = {
logger.debug("Creating and starting a stream")
val msgCount = fixture.msgCount
val sinkTopic = fixture.sinkTopic
val source = fixture.source
val promise = Promise[Unit]()
val logPercentStep = 1
val loggedStep = if (msgCount > logPercentStep) 100 else 1
val control = source
.map { msg =>
ProducerMessage.single(new ProducerRecord[Array[Byte], String](sinkTopic, msg.record.value()),
msg.committableOffset)
}
.map { msg =>
meter.mark()
val offset = msg.passThrough.partitionOffset.offset
if (offset % loggedStep == 0)
logger.info(s"Transformed $offset elements to Kafka (${100 * offset / msgCount}%)")
if (offset >= fixture.msgCount - 1)
promise.complete(Success(()))
msg
}
.toMat(fixture.sink)(DrainingControl.apply)
.run()
Await.result(promise.future, streamingTimeout)
control.drainAndShutdown()(mat.executionContext)
logger.debug("Stream finished")
}