public ReconcileProgress reconcile()

in spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java [42:91]


  public ReconcileProgress reconcile(
      SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
    ExecutorInstanceConfig executorInstanceConfig =
        context.getResource().getSpec().getApplicationTolerations().getInstanceConfig();
    ApplicationStateSummary prevStateSummary =
        context.getResource().getStatus().getCurrentState().getCurrentStateSummary();
    ApplicationStateSummary proposedStateSummary;
    String stateMessage = context.getResource().getStatus().getCurrentState().getMessage();
    if (executorInstanceConfig == null
        || executorInstanceConfig.getInitExecutors() == 0L
        || !prevStateSummary.isStarting() && executorInstanceConfig.getMinExecutors() == 0L) {
      proposedStateSummary = ApplicationStateSummary.RunningHealthy;
      stateMessage = RUNNING_HEALTHY_MESSAGE;
    } else {
      Set<Pod> executors = context.getExecutorsForApplication();
      long runningExecutors = executors.stream().filter(PodUtils::isPodReady).count();
      if (prevStateSummary.isStarting()) {
        if (runningExecutors >= executorInstanceConfig.getInitExecutors()) {
          proposedStateSummary = ApplicationStateSummary.RunningHealthy;
          stateMessage = RUNNING_HEALTHY_MESSAGE;
        } else if (runningExecutors > 0L) {
          proposedStateSummary = ApplicationStateSummary.InitializedBelowThresholdExecutors;
          stateMessage = INITIALIZED_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE;
        } else {
          // keep previous state for 0 executor
          proposedStateSummary = prevStateSummary;
        }
      } else {
        if (runningExecutors >= executorInstanceConfig.getMinExecutors()) {
          proposedStateSummary = ApplicationStateSummary.RunningHealthy;
          stateMessage = RUNNING_HEALTHY_MESSAGE;
        } else {
          proposedStateSummary = ApplicationStateSummary.RunningWithBelowThresholdExecutors;
          stateMessage = RUNNING_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE;
        }
      }
    }
    if (proposedStateSummary.equals(prevStateSummary)) {
      return observeDriver(
          context, statusRecorder, Collections.singletonList(new AppDriverRunningObserver()));
    } else {
      statusRecorder.persistStatus(
          context,
          context
              .getResource()
              .getStatus()
              .appendNewState(new ApplicationState(proposedStateSummary, stateMessage)));
      return completeAndDefaultRequeue();
    }
  }