def plainFlow()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/ReactiveKafkaProducerBenchmarks.scala [41:71]


  def plainFlow(fixture: ReactiveKafkaProducerTestFixture[Int], meter: Meter)(implicit mat: Materializer): Unit = {
    logger.debug("Creating and starting a stream")
    @volatile var lastPartStart = System.nanoTime()

    val msg = PerfFixtureHelpers.stringOfSize(fixture.msgSize)

    val future = Source(0 to fixture.msgCount)
      .map { number =>
        val partition: Int = (number % fixture.numberOfPartitions).toInt
        ProducerMessage.single(new ProducerRecord[Array[Byte], String](fixture.topic, partition, null, msg), number)
      }
      .via(fixture.flow)
      .map {
        case msg: Result[Array[Byte], String, Int] =>
          meter.mark()
          if (msg.offset % logStep == 0) {
            val lastPartEnd = System.nanoTime()
            val took = (lastPartEnd - lastPartStart).nanos
            logger.info(s"Sent ${msg.offset}, took ${took.toMillis} ms to send last $logStep")
            lastPartStart = lastPartEnd
          }
          msg

        case other: Results[Array[Byte], String, Int] =>
          meter.mark()
          other
      }
      .runWith(Sink.ignore)
    Await.result(future, atMost = streamingTimeout)
    logger.info("Stream finished")
  }