in core/src/main/scala/org/apache/spark/eventhubs/utils/RetryUtils.scala [109:150]
final def retryScala[T](fn: => Future[T],
opName: String,
maxRetry: Int = RetryCount,
delay: Int = 10,
replaceTransientErrors: Future[T] = null): Future[T] = {
def retryHelper(fn: => Future[T], retryCount: Int): Future[T] = {
val taskId = EventHubsUtils.getTaskId
fn.recoverWith {
case eh: EventHubException if eh.getIsTransient =>
if (retryCount >= maxRetry) {
logInfo(s"(TID $taskId) failure: $opName")
throw eh
}
if (replaceTransientErrors != null) {
logInfo(s"(TID $taskId) ignoring transient failure in $opName")
replaceTransientErrors
} else {
logInfo(s"(TID $taskId) retrying $opName after $delay ms")
after(delay.milliseconds)(retryHelper(fn, retryCount + 1))
}
case t: Throwable =>
t.getCause match {
case eh: EventHubException if eh.getIsTransient =>
if (retryCount >= maxRetry) {
logInfo(s"(TID $taskId) failure: $opName")
throw eh
}
if (replaceTransientErrors != null) {
logInfo(s"(TID $taskId) ignoring transient failure in $opName")
replaceTransientErrors
} else {
logInfo(s"(TID $taskId) retrying $opName after $delay ms")
after(delay.milliseconds)(retryHelper(fn, retryCount + 1))
}
case _ =>
logInfo(s"(TID $taskId) failure: $opName")
throw t
}
}
}
retryHelper(fn, 0)
}