private def commit()

in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [568:613]


  private def commit(commitMap: Map[TopicPartition, OffsetAndMetadata], replyTo: Vector[ActorRef]): Unit = {
    commitRefreshing.updateRefreshDeadlines(commitMap.keySet)
    commitsInProgress += 1
    val startTime = System.nanoTime()
    consumer.commitAsync(
      commitMap.asJava,
      new OffsetCommitCallback {
        override def onComplete(offsets: java.util.Map[TopicPartition, OffsetAndMetadata],
            exception: Exception): Unit = {
          def retryCommits(duration: Long, e: Throwable): Unit = {
            log.warning("Kafka commit is to be retried, after={} ms, commitsInProgress={}, cause={}",
              duration / 1000000L,
              commitsInProgress,
              e.toString)
            commitMaps = commitMap.toList ++ commitMaps
            commitSenders = commitSenders ++ replyTo
            requestDelayedPoll()
          }

          // this is invoked on the thread calling consumer.poll which will always be the actor, so it is safe
          val duration = System.nanoTime() - startTime
          commitsInProgress -= 1
          exception match {
            case null =>
              if (duration > settings.commitTimeWarning.toNanos) {
                log.warning("Kafka commit took longer than `commit-time-warning`: {} ms, commitsInProgress={}",
                  duration / 1000000L,
                  commitsInProgress)
              }
              progressTracker.committed(offsets)
              replyTo.foreach(_ ! Done)

            case e: RebalanceInProgressException   => retryCommits(duration, e)
            case e: RetriableCommitFailedException => retryCommits(duration, e.getCause)

            case commitException =>
              log.error("Kafka commit failed after={} ms, commitsInProgress={}, exception={}",
                duration / 1000000L,
                commitsInProgress,
                commitException)
              val failure = Status.Failure(commitException)
              replyTo.foreach(_ ! failure)
          }
        }
      })
  }