in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala [77:129]
def consumerAtLeastOnceBatched(batchSize: Int)(fixture: KafkaConsumerTestFixture, meter: Meter): Unit = {
val consumer = fixture.consumer
var lastProcessedOffset = Map.empty[Int, Long]
var accumulatedMsgCount = 0L
var commitInProgress = false
val assignment = consumer.assignment()
def doCommit(): Unit = {
accumulatedMsgCount = 0
val offsetMap = lastProcessedOffset.map {
case (partition, offset) =>
new TopicPartition(fixture.topic, partition) -> new OffsetAndMetadata(offset)
}
logger.debug("Committing offset " + offsetMap.head._2.offset())
consumer.commitAsync(
offsetMap.asJava,
new OffsetCommitCallback {
override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit =
commitInProgress = false
})
lastProcessedOffset = Map.empty[Int, Long]
}
@tailrec
def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
if (readSoFar >= readLimit)
readSoFar
else {
logger.debug("Polling")
if (!commitInProgress)
consumer.resume(assignment)
val records = consumer.poll(pollTimeoutMs)
for (record <- records.iterator().asScala) {
accumulatedMsgCount = accumulatedMsgCount + 1
meter.mark()
lastProcessedOffset += record.partition() -> record.offset()
if (accumulatedMsgCount >= batchSize) {
if (!commitInProgress) {
commitInProgress = true
doCommit()
} else // previous commit still in progress
consumer.pause(assignment)
}
}
val recordCount = records.count()
logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
pollInLoop(readLimit, readSoFar + recordCount)
}
pollInLoop(readLimit = fixture.msgCount)
fixture.close()
}