final def retryScala[T]()

in core/src/main/scala/org/apache/spark/eventhubs/utils/RetryUtils.scala [109:150]


  final def retryScala[T](fn: => Future[T],
                          opName: String,
                          maxRetry: Int = RetryCount,
                          delay: Int = 10,
                          replaceTransientErrors: Future[T] = null): Future[T] = {
    def retryHelper(fn: => Future[T], retryCount: Int): Future[T] = {
      val taskId = EventHubsUtils.getTaskId
      fn.recoverWith {
        case eh: EventHubException if eh.getIsTransient =>
          if (retryCount >= maxRetry) {
            logInfo(s"(TID $taskId) failure: $opName")
            throw eh
          }
          if (replaceTransientErrors != null) {
            logInfo(s"(TID $taskId) ignoring transient failure in $opName")
            replaceTransientErrors
          } else {
            logInfo(s"(TID $taskId) retrying $opName after $delay ms")
            after(delay.milliseconds)(retryHelper(fn, retryCount + 1))
          }
        case t: Throwable =>
          t.getCause match {
            case eh: EventHubException if eh.getIsTransient =>
              if (retryCount >= maxRetry) {
                logInfo(s"(TID $taskId) failure: $opName")
                throw eh
              }
              if (replaceTransientErrors != null) {
                logInfo(s"(TID $taskId) ignoring transient failure in $opName")
                replaceTransientErrors
              } else {
                logInfo(s"(TID $taskId) retrying $opName after $delay ms")
                after(delay.milliseconds)(retryHelper(fn, retryCount + 1))
              }
            case _ =>
              logInfo(s"(TID $taskId) failure: $opName")
              throw t
          }
      }
    }
    retryHelper(fn, 0)
  }