in core/src/main/scala/org/apache/pekko/kafka/internal/ConnectionChecker.scala [45:63]
def backoff(failedAttempts: Int = 1, backoffCheckInterval: FiniteDuration): Receive =
LoggingReceive.withLabel(s"backoff($failedAttempts, $backoffCheckInterval)")(
behaviour(failedAttempts, backoffCheckInterval))
def behaviour(failedAttempts: Int, interval: FiniteDuration): Receive = {
case CheckConnection =>
context.parent ! Metadata.ListTopics
case Metadata.Topics(Failure(te: TimeoutException)) =>
// failedAttempts is a sum of first triggered failure and retries (retries + 1)
if (failedAttempts == maxRetries) {
context.parent ! KafkaConnectionFailed(te, maxRetries)
context.stop(self)
} else context.become(backoff(failedAttempts + 1, startBackoffTimer(interval)))
case Metadata.Topics(Success(_)) =>
startTimer()
context.become(regular)
}