public ReconcileProgress reconcile()

in spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java [60:143]


  public ReconcileProgress reconcile(
      SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
    ApplicationStatus currentStatus = context.getResource().getStatus();
    ApplicationTolerations tolerations =
        context.getResource().getSpec().getApplicationTolerations();
    ResourceRetainPolicy resourceRetainPolicy = tolerations.getResourceRetainPolicy();
    String stateMessage = null;

    if (retainReleaseResource(resourceRetainPolicy, currentStatus.getCurrentState())) {
      if (tolerations.getRestartConfig() != null
          && !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) {
        stateMessage =
            "Application is configured to restart, resources created in current "
                + "attempt would be force released.";
        log.warn(stateMessage);
      } else {
        ApplicationState terminationState =
            new ApplicationState(
                ApplicationStateSummary.TerminatedWithoutReleaseResources,
                "Application is terminated without releasing resources as configured.");
        long requeueAfterMillis =
            tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
        return appendStateAndRequeueAfter(
            context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis));
      }
    }
    List<HasMetadata> resourcesToRemove = new ArrayList<>();
    if (ApplicationStateSummary.SchedulingFailure.equals(
        currentStatus.getCurrentState().getCurrentStateSummary())) {
      // if app failed at scheduling, re-compute all spec and delete as they may not be fully
      // owned by driver
      try {
        resourcesToRemove.addAll(context.getDriverPreResourcesSpec());
        resourcesToRemove.add(context.getDriverPodSpec());
        resourcesToRemove.addAll(context.getDriverResourcesSpec());
      } catch (Exception e) {
        if (log.isErrorEnabled()) {
          log.error("Failed to build resources for application.", e);
        }
        ApplicationState updatedState =
            new ApplicationState(
                ApplicationStateSummary.ResourceReleased,
                "Cannot build Spark spec for given application, "
                    + "consider all resources as released.");
        long requeueAfterMillis =
            tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
        return appendStateAndRequeueAfter(
            context, statusRecorder, updatedState, Duration.ofMillis(requeueAfterMillis));
      }
    } else {
      Optional<Pod> driver = context.getDriverPod();
      driver.ifPresent(resourcesToRemove::add);
    }
    boolean forceDelete = enableForceDelete(context.getResource());
    for (HasMetadata resource : resourcesToRemove) {
      ReconcilerUtils.deleteResourceIfExists(context.getClient(), resource, forceDelete);
    }
    ApplicationStatus updatedStatus;
    if (cleanUpSuccessStateSupplier != null) {
      ApplicationState state = cleanUpSuccessStateSupplier.get();
      if (StringUtils.isNotEmpty(stateMessage)) {
        state.setMessage(stateMessage);
      }
      long requeueAfterMillis =
          tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
      return appendStateAndRequeueAfter(
          context, statusRecorder, state, Duration.ofMillis(requeueAfterMillis));
    } else {
      updatedStatus =
          currentStatus.terminateOrRestart(
              tolerations.getRestartConfig(),
              tolerations.getResourceRetainPolicy(),
              stateMessage,
              SparkOperatorConf.TRIM_ATTEMPT_STATE_TRANSITION_HISTORY.getValue());
      long requeueAfterMillis =
          tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
      if (ApplicationStateSummary.ScheduledToRestart.equals(
          updatedStatus.getCurrentState().getCurrentStateSummary())) {
        requeueAfterMillis = tolerations.getRestartConfig().getRestartBackoffMillis();
      }
      return updateStatusAndRequeueAfter(
          context, statusRecorder, updatedStatus, Duration.ofMillis(requeueAfterMillis));
    }
  }