private def createTopicAndFill()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PerfFixtureHelpers.scala [83:119]


  private def createTopicAndFill(ft: FilledTopic, props: java.util.Map[String, AnyRef], admin: Admin) = {
    val result = admin.createTopics(
      Arrays.asList(
        new NewTopic(ft.topic, ft.numberOfPartitions, ft.replicationFactor.toShort)
          .configs(new util.HashMap[String, String]())))
    result.all().get(10, TimeUnit.SECONDS)
    // fill topic with messages
    val producer =
      new KafkaProducer[Array[Byte], String](props, new ByteArraySerializer, new StringSerializer)
    val lastElementStoredPromise = Promise[Unit]()
    val loggedStep = if (ft.msgCount > logPercentStep) ft.msgCount / (100 / logPercentStep) else 1
    val msg = stringOfSize(ft.msgSize)
    for (i <- 0L to ft.msgCount.toLong) {
      if (!lastElementStoredPromise.isCompleted) {
        val partition: Int = (i % ft.numberOfPartitions).toInt
        producer.send(
          new ProducerRecord[Array[Byte], String](ft.topic, partition, null, msg),
          new Callback {
            override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit =
              if (e == null) {
                if (i % loggedStep == 0)
                  logger.info(s"Written $i elements to Kafka (${100 * i / ft.msgCount}%)")
                if (i >= ft.msgCount - 1 && !lastElementStoredPromise.isCompleted)
                  lastElementStoredPromise.success(())
              } else {
                if (!lastElementStoredPromise.isCompleted) {
                  e.printStackTrace()
                  lastElementStoredPromise.failure(e)
                }
              }
          })
      }
    }
    val lastElementStoredFuture = lastElementStoredPromise.future
    Await.result(lastElementStoredFuture, atMost = producerTimeout)
    producer
  }