in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/PerfFixtureHelpers.scala [83:119]
private def createTopicAndFill(ft: FilledTopic, props: java.util.Map[String, AnyRef], admin: Admin) = {
val result = admin.createTopics(
Arrays.asList(
new NewTopic(ft.topic, ft.numberOfPartitions, ft.replicationFactor.toShort)
.configs(new util.HashMap[String, String]())))
result.all().get(10, TimeUnit.SECONDS)
// fill topic with messages
val producer =
new KafkaProducer[Array[Byte], String](props, new ByteArraySerializer, new StringSerializer)
val lastElementStoredPromise = Promise[Unit]()
val loggedStep = if (ft.msgCount > logPercentStep) ft.msgCount / (100 / logPercentStep) else 1
val msg = stringOfSize(ft.msgSize)
for (i <- 0L to ft.msgCount.toLong) {
if (!lastElementStoredPromise.isCompleted) {
val partition: Int = (i % ft.numberOfPartitions).toInt
producer.send(
new ProducerRecord[Array[Byte], String](ft.topic, partition, null, msg),
new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit =
if (e == null) {
if (i % loggedStep == 0)
logger.info(s"Written $i elements to Kafka (${100 * i / ft.msgCount}%)")
if (i >= ft.msgCount - 1 && !lastElementStoredPromise.isCompleted)
lastElementStoredPromise.success(())
} else {
if (!lastElementStoredPromise.isCompleted) {
e.printStackTrace()
lastElementStoredPromise.failure(e)
}
}
})
}
}
val lastElementStoredFuture = lastElementStoredPromise.future
Await.result(lastElementStoredFuture, atMost = producerTimeout)
producer
}