def applyRecovery()

in core/src/main/scala/org/apache/pekko/projection/internal/HandlerRecoveryImpl.scala [63:191]


  def applyRecovery(
      env: Envelope,
      firstOffset: Offset, // used for logging
      lastOffset: Offset, // used for logging
      abort: Future[Done], // retries can be aborted by failing this Future
      futureCallback: () => Future[Done],
      onSkip: () => Future[Done] = HandlerRecoveryImpl.defaultOnSkip)(implicit system: ActorSystem[_]): Future[Done] = {
    import HandlerRecoveryStrategy.Internal._

    implicit val scheduler: Scheduler = system.classicSystem.scheduler
    implicit val executionContext: ExecutionContext = system.executionContext

    val tryFutureCallback: () => Future[Done] = { () =>
      if (abort.isCompleted) {
        abort
      } else {
        try {
          futureCallback()
        } catch {
          case NonFatal(e) =>
            // in case the callback throws instead of returning failed Future
            Future.failed(e)
        }
      }
    }

    val retryFutureCallback: () => Future[Done] = { () =>
      tryFutureCallback().recoverWith {
        // using recoverWith instead of `.failed.foreach` to make sure that calls to statusObserver
        // are invoked in sequential order
        case err if !abort.isCompleted =>
          telemetry.error(err)
          statusObserver.error(projectionId, env, err, recoveryStrategy)
          Future.failed(err)
      }
    }

    // this will count as one attempt
    val firstAttempt = tryFutureCallback()

    def offsetLogParameter: String =
      if (firstOffset == lastOffset) s"envelope with offset [$firstOffset]"
      else s"envelopes with offsets from [$firstOffset] to [$lastOffset]"

    firstAttempt.recoverWith {
      case _ if abort.isCompleted =>
        abort
      case err =>
        telemetry.error(err)
        statusObserver.error(projectionId, env, err, recoveryStrategy)
        def delayFunction(delay: FiniteDuration): Int => Option[FiniteDuration] = { _ =>
          abort.value match {
            case None             => Some(delay)
            case Some(Success(_)) =>
              // abort immediately, will return the completed `abort: Future` from tryFutureCallback
              Some(Duration.Zero)
            case Some(Failure(abortErr)) =>
              // by throwing from the delay function the `retry` will stop
              throw abortErr
          }
        }

        recoveryStrategy match {
          case Fail =>
            logger.error(
              cause = err,
              template = "[{}] Failed to process {}. Projection will stop as defined by recovery strategy.",
              projectionId.id,
              offsetLogParameter)
            firstAttempt

          case Skip =>
            logger.warning(
              "[{}] Failed to process {}. " +
              "Envelope will be skipped as defined by recovery strategy. Exception: {}",
              projectionId.id,
              offsetLogParameter,
              err)
            onSkip()

          case RetryAndFail(retries, delay) =>
            logger.warning(
              "[{}] First attempt to process {} failed. Will retry [{}] time(s). " +
              "Exception: {}",
              projectionId.id,
              offsetLogParameter,
              retries,
              err)

            // retries - 1 because retry() is based on attempts
            // first attempt is performed immediately and therefore we must first delay
            val retried = after(delay, scheduler)(retry(retryFutureCallback, retries - 1, delayFunction(delay)))
            retried.failed.foreach { exception =>
              if (!abort.isCompleted)
                logger.error(
                  cause = exception,
                  template =
                    "[{}] Failed to process {} after [{}] attempts. " +
                    "Projection will stop as defined by recovery strategy.",
                  projectionId,
                  offsetLogParameter,
                  retries + 1)
            }
            retried

          case RetryAndSkip(retries, delay) =>
            logger.warning(
              "[{}] First attempt to process {} failed. Will retry [{}] time(s). Exception: {}",
              projectionId.id,
              offsetLogParameter,
              retries,
              err)

            // retries - 1 because retry() is based on attempts
            // first attempt is performed immediately and therefore we must first delay
            val retried = after(delay, scheduler)(retry(retryFutureCallback, retries - 1, delayFunction(delay)))
            retried.failed.foreach { exception =>
              logger.warning(
                "[{}] Failed to process {} after [{}] attempts. " +
                "Envelope will be skipped as defined by recovery strategy. Last exception: {}",
                projectionId.id,
                offsetLogParameter,
                retries + 1,
                exception)
            }
            retried.recoverWith { case _ => onSkip() }
        }
    }
  }