def consumerAtLeastOnceCommitEveryPoll()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala [134:172]


  def consumerAtLeastOnceCommitEveryPoll()(fixture: KafkaConsumerTestFixture, meter: Meter): Unit = {
    val consumer = fixture.consumer

    var lastProcessedOffset = Map.empty[Int, Long]

    def doCommit(): Unit = {
      val offsetMap = lastProcessedOffset.map {
        case (partition, offset) =>
          new TopicPartition(fixture.topic, partition) -> new OffsetAndMetadata(offset)
      }
      logger.debug("Committing offset " + offsetMap.head._2.offset())
      consumer.commitAsync(
        offsetMap.asJava,
        new OffsetCommitCallback {
          override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit = ()
        })
      lastProcessedOffset = Map.empty[Int, Long]
    }

    @tailrec
    def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
      if (readSoFar >= readLimit)
        readSoFar
      else {
        logger.debug("Polling")
        val records = consumer.poll(pollTimeoutMs)
        for (record <- records.iterator().asScala) {
          meter.mark()
          lastProcessedOffset += record.partition() -> record.offset()
        }
        doCommit()
        val recordCount = records.count()
        logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
        pollInLoop(readLimit, readSoFar + recordCount)
      }

    pollInLoop(readLimit = fixture.msgCount)
    fixture.close()
  }