def consumeTransformProduceTransaction()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaTransactionBenchmarks.scala [39:73]


  def consumeTransformProduceTransaction(fixture: TransactionFixture,
      meter: Meter)(implicit mat: Materializer): Unit = {
    logger.debug("Creating and starting a stream")
    val msgCount = fixture.msgCount
    val sinkTopic = fixture.sinkTopic
    val source = fixture.source

    val promise = Promise[Unit]()
    val logPercentStep = 1
    val loggedStep = if (msgCount > logPercentStep) 100 else 1

    val control = source
      .map { msg =>
        ProducerMessage.single(new ProducerRecord[Array[Byte], String](sinkTopic, msg.record.value()),
          msg.partitionOffset)
      }
      .via(fixture.flow)
      .toMat(Sink.foreach {
        case result: Result[Key, Val, PassThrough] =>
          meter.mark()
          val offset = result.offset
          if (result.offset % loggedStep == 0)
            logger.info(s"Transformed $offset elements to Kafka (${100 * offset / msgCount}%)")
          if (result.offset >= fixture.msgCount - 1)
            promise.complete(Success(()))
        case other: Results[Key, Val, PassThrough] =>
          meter.mark()
          promise.complete(Success(()))
      })(Keep.left)
      .run()

    Await.result(promise.future, streamingTimeout)
    control.shutdown()
    logger.debug("Stream finished")
  }