def consumeCommitAtMostOnce()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala [177:209]


  def consumeCommitAtMostOnce(fixture: KafkaConsumerTestFixture, meter: Meter): Unit = {
    val consumer = fixture.consumer
    val assignment = consumer.assignment()
    @tailrec
    def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
      if (readSoFar >= readLimit)
        readSoFar
      else {
        logger.debug(s"Polling")
        consumer.pause(assignment)
        val records = consumer.poll(pollTimeoutMs)

        for (record <- records.iterator().asScala) {
          meter.mark()
          val offsetMap = Map(new TopicPartition(fixture.topic, 0) -> new OffsetAndMetadata(record.offset()))
          consumer.pause(assignment)

          consumer.commitAsync(
            offsetMap.asJava,
            new OffsetCommitCallback {
              override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit =
                consumer.resume(assignment)
            })
        }

        val recordCount = records.count()
        logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
        pollInLoop(readLimit, readSoFar + recordCount)
      }

    pollInLoop(readLimit = fixture.msgCount)
    fixture.close()
  }