in spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppRunningStep.java [42:91]
public ReconcileProgress reconcile(
SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
ExecutorInstanceConfig executorInstanceConfig =
context.getResource().getSpec().getApplicationTolerations().getInstanceConfig();
ApplicationStateSummary prevStateSummary =
context.getResource().getStatus().getCurrentState().getCurrentStateSummary();
ApplicationStateSummary proposedStateSummary;
String stateMessage = context.getResource().getStatus().getCurrentState().getMessage();
if (executorInstanceConfig == null
|| executorInstanceConfig.getInitExecutors() == 0L
|| !prevStateSummary.isStarting() && executorInstanceConfig.getMinExecutors() == 0L) {
proposedStateSummary = ApplicationStateSummary.RunningHealthy;
stateMessage = RUNNING_HEALTHY_MESSAGE;
} else {
Set<Pod> executors = context.getExecutorsForApplication();
long runningExecutors = executors.stream().filter(PodUtils::isPodReady).count();
if (prevStateSummary.isStarting()) {
if (runningExecutors >= executorInstanceConfig.getInitExecutors()) {
proposedStateSummary = ApplicationStateSummary.RunningHealthy;
stateMessage = RUNNING_HEALTHY_MESSAGE;
} else if (runningExecutors > 0L) {
proposedStateSummary = ApplicationStateSummary.InitializedBelowThresholdExecutors;
stateMessage = INITIALIZED_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE;
} else {
// keep previous state for 0 executor
proposedStateSummary = prevStateSummary;
}
} else {
if (runningExecutors >= executorInstanceConfig.getMinExecutors()) {
proposedStateSummary = ApplicationStateSummary.RunningHealthy;
stateMessage = RUNNING_HEALTHY_MESSAGE;
} else {
proposedStateSummary = ApplicationStateSummary.RunningWithBelowThresholdExecutors;
stateMessage = RUNNING_WITH_BELOW_THRESHOLD_EXECUTORS_MESSAGE;
}
}
}
if (proposedStateSummary.equals(prevStateSummary)) {
return observeDriver(
context, statusRecorder, Collections.singletonList(new AppDriverRunningObserver()));
} else {
statusRecorder.persistStatus(
context,
context
.getResource()
.getStatus()
.appendNewState(new ApplicationState(proposedStateSummary, stateMessage)));
return completeAndDefaultRequeue();
}
}