in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [568:613]
private def commit(commitMap: Map[TopicPartition, OffsetAndMetadata], replyTo: Vector[ActorRef]): Unit = {
commitRefreshing.updateRefreshDeadlines(commitMap.keySet)
commitsInProgress += 1
val startTime = System.nanoTime()
consumer.commitAsync(
commitMap.asJava,
new OffsetCommitCallback {
override def onComplete(offsets: java.util.Map[TopicPartition, OffsetAndMetadata],
exception: Exception): Unit = {
def retryCommits(duration: Long, e: Throwable): Unit = {
log.warning("Kafka commit is to be retried, after={} ms, commitsInProgress={}, cause={}",
duration / 1000000L,
commitsInProgress,
e.toString)
commitMaps = commitMap.toList ++ commitMaps
commitSenders = commitSenders ++ replyTo
requestDelayedPoll()
}
// this is invoked on the thread calling consumer.poll which will always be the actor, so it is safe
val duration = System.nanoTime() - startTime
commitsInProgress -= 1
exception match {
case null =>
if (duration > settings.commitTimeWarning.toNanos) {
log.warning("Kafka commit took longer than `commit-time-warning`: {} ms, commitsInProgress={}",
duration / 1000000L,
commitsInProgress)
}
progressTracker.committed(offsets)
replyTo.foreach(_ ! Done)
case e: RebalanceInProgressException => retryCommits(duration, e)
case e: RetriableCommitFailedException => retryCommits(duration, e.getCause)
case commitException =>
log.error("Kafka commit failed after={} ms, commitsInProgress={}, exception={}",
duration / 1000000L,
commitsInProgress,
commitException)
val failure = Status.Failure(commitException)
replyTo.foreach(_ ! failure)
}
}
})
}