def backoff()

in core/src/main/scala/org/apache/pekko/kafka/internal/ConnectionChecker.scala [45:63]


  def backoff(failedAttempts: Int = 1, backoffCheckInterval: FiniteDuration): Receive =
    LoggingReceive.withLabel(s"backoff($failedAttempts, $backoffCheckInterval)")(
      behaviour(failedAttempts, backoffCheckInterval))

  def behaviour(failedAttempts: Int, interval: FiniteDuration): Receive = {
    case CheckConnection =>
      context.parent ! Metadata.ListTopics

    case Metadata.Topics(Failure(te: TimeoutException)) =>
      // failedAttempts is a sum of first triggered failure and retries (retries + 1)
      if (failedAttempts == maxRetries) {
        context.parent ! KafkaConnectionFailed(te, maxRetries)
        context.stop(self)
      } else context.become(backoff(failedAttempts + 1, startBackoffTimer(interval)))

    case Metadata.Topics(Success(_)) =>
      startTimer()
      context.become(regular)
  }