def consumerAtLeastOnceBatched()

in benchmarks/src/main/scala/org/apache/pekko/kafka/benchmarks/KafkaConsumerBenchmarks.scala [77:129]


  def consumerAtLeastOnceBatched(batchSize: Int)(fixture: KafkaConsumerTestFixture, meter: Meter): Unit = {
    val consumer = fixture.consumer

    var lastProcessedOffset = Map.empty[Int, Long]
    var accumulatedMsgCount = 0L
    var commitInProgress = false
    val assignment = consumer.assignment()

    def doCommit(): Unit = {
      accumulatedMsgCount = 0
      val offsetMap = lastProcessedOffset.map {
        case (partition, offset) =>
          new TopicPartition(fixture.topic, partition) -> new OffsetAndMetadata(offset)
      }
      logger.debug("Committing offset " + offsetMap.head._2.offset())
      consumer.commitAsync(
        offsetMap.asJava,
        new OffsetCommitCallback {
          override def onComplete(map: util.Map[TopicPartition, OffsetAndMetadata], e: Exception): Unit =
            commitInProgress = false
        })
      lastProcessedOffset = Map.empty[Int, Long]
    }

    @tailrec
    def pollInLoop(readLimit: Int, readSoFar: Int = 0): Int =
      if (readSoFar >= readLimit)
        readSoFar
      else {
        logger.debug("Polling")
        if (!commitInProgress)
          consumer.resume(assignment)
        val records = consumer.poll(pollTimeoutMs)
        for (record <- records.iterator().asScala) {
          accumulatedMsgCount = accumulatedMsgCount + 1
          meter.mark()
          lastProcessedOffset += record.partition() -> record.offset()
          if (accumulatedMsgCount >= batchSize) {
            if (!commitInProgress) {
              commitInProgress = true
              doCommit()
            } else // previous commit still in progress
              consumer.pause(assignment)
          }
        }
        val recordCount = records.count()
        logger.debug(s"${readSoFar + recordCount} records read. Limit = $readLimit")
        pollInLoop(readLimit, readSoFar + recordCount)
      }

    pollInLoop(readLimit = fixture.msgCount)
    fixture.close()
  }