in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala [177:209]
def consumeCommitAtMostOnce(fixture: KafkaConsumerTestFixture, meter: Meter): Unit = {
val consumer = fixture.consumer
val assignment = consumer.assignment()
@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit)
readSoFar
else {
logger.debug(s"Polling")
consumer.pause(assignment)
val records = consumer.poll(pollTimeoutMs)
for (record <- records.iterator().asScala) {
meter.mark()
val offsetMap = Map(new TopicPartition(fixture.topic, 0) -> new OffsetAndMetadata(record.offset()))
consumer.pause(assignment)
consumer.commitAsync(
offsetMap.asJava,
new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit =
consumer.resume(assignment)
})
}
val recordCount = records.count()
logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
pollInLoop(readLimit, readSoFar + recordCount)
}
pollInLoop(readLimit = fixture.msgCount)
fixture.close()
}