public ReconcileProgress reconcile()

in spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java [52:126]


  public ReconcileProgress reconcile(
      SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
    ApplicationState currentState = context.getResource().getStatus().getCurrentState();
    if (!currentState.getCurrentStateSummary().isInitializing()) {
      return proceed();
    }
    SparkApplication app = context.getResource();
    if (app.getStatus().getPreviousAttemptSummary() != null) {
      Instant lastTransitionTime = Instant.parse(currentState.getLastTransitionTime());
      Instant restartTime =
          lastTransitionTime.plusMillis(
              app.getSpec()
                  .getApplicationTolerations()
                  .getRestartConfig()
                  .getRestartBackoffMillis());
      Instant now = Instant.now();
      if (restartTime.isAfter(now)) {
        return ReconcileProgress.completeAndRequeueAfter(Duration.between(now, restartTime));
      }
    }
    try {
      List<HasMetadata> createdPreResources = new ArrayList<>();
      for (HasMetadata resource : context.getDriverPreResourcesSpec()) {
        Optional<HasMetadata> createdResource =
            ReconcilerUtils.getOrCreateSecondaryResource(context.getClient(), resource);
        if (createdResource.isPresent()) {
          createdPreResources.add(createdResource.get());
        } else {
          return appendStateAndImmediateRequeue(
              context, statusRecorder, creationFailureState(resource));
        }
      }
      Optional<Pod> driverPod =
          ReconcilerUtils.getOrCreateSecondaryResource(
              context.getClient(), context.getDriverPodSpec());
      if (driverPod.isPresent()) {
        DriverResourceDecorator decorator = new DriverResourceDecorator(driverPod.get());
        createdPreResources.forEach(decorator::decorate);
        context.getClient().resourceList(createdPreResources).forceConflicts().serverSideApply();
        List<HasMetadata> driverResources = context.getDriverResourcesSpec();
        driverResources.forEach(decorator::decorate);
        for (HasMetadata resource : driverResources) {
          Optional<HasMetadata> createdResource =
              ReconcilerUtils.getOrCreateSecondaryResource(context.getClient(), resource);
          if (createdResource.isEmpty()) {
            return appendStateAndImmediateRequeue(
                context, statusRecorder, creationFailureState(resource));
          }
        }
      }
      ApplicationStatus updatedStatus =
          context
              .getResource()
              .getStatus()
              .appendNewState(
                  new ApplicationState(
                      ApplicationStateSummary.DriverRequested, Constants.DRIVER_REQUESTED_MESSAGE));
      statusRecorder.persistStatus(context, updatedStatus);
      return completeAndDefaultRequeue();
    } catch (Exception e) {
      if (log.isErrorEnabled()) {
        log.error("Failed to request driver resource.", e);
      }
      String errorMessage =
          Constants.SCHEDULE_FAILURE_MESSAGE + " StackTrace: " + buildGeneralErrorMessage(e);
      statusRecorder.persistStatus(
          context,
          context
              .getResource()
              .getStatus()
              .appendNewState(
                  new ApplicationState(ApplicationStateSummary.SchedulingFailure, errorMessage)));
      return completeAndImmediateRequeue();
    }
  }