def run()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PekkoConnectorsCommittableSinkFixtures.scala [117:147]


  def run(fixture: Fixture, meter: Meter)(implicit mat: Materializer): Unit = {
    logger.debug("Creating and starting a stream")
    val msgCount = fixture.msgCount
    val sinkTopic = fixture.sinkTopic
    val source = fixture.source

    val promise = Promise[Unit]()
    val logPercentStep = 1
    val loggedStep = if (msgCount > logPercentStep) 100 else 1

    val control = source
      .map { msg =>
        ProducerMessage.single(new ProducerRecord[Array[Byte], String](sinkTopic, msg.record.value()),
          msg.committableOffset)
      }
      .map { msg =>
        meter.mark()
        val offset = msg.passThrough.partitionOffset.offset
        if (offset % loggedStep == 0)
          logger.info(s"Transformed $offset elements to Kafka (${100 * offset / msgCount}%)")
        if (offset >= fixture.msgCount - 1)
          promise.complete(Success(()))
        msg
      }
      .toMat(fixture.sink)(DrainingControl.apply)
      .run()

    Await.result(promise.future, streamingTimeout)
    control.drainAndShutdown()(mat.executionContext)
    logger.debug("Stream finished")
  }