in spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppCleanUpStep.java [60:143]
public ReconcileProgress reconcile(
SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
ApplicationStatus currentStatus = context.getResource().getStatus();
ApplicationTolerations tolerations =
context.getResource().getSpec().getApplicationTolerations();
ResourceRetainPolicy resourceRetainPolicy = tolerations.getResourceRetainPolicy();
String stateMessage = null;
if (retainReleaseResource(resourceRetainPolicy, currentStatus.getCurrentState())) {
if (tolerations.getRestartConfig() != null
&& !RestartPolicy.Never.equals(tolerations.getRestartConfig().getRestartPolicy())) {
stateMessage =
"Application is configured to restart, resources created in current "
+ "attempt would be force released.";
log.warn(stateMessage);
} else {
ApplicationState terminationState =
new ApplicationState(
ApplicationStateSummary.TerminatedWithoutReleaseResources,
"Application is terminated without releasing resources as configured.");
long requeueAfterMillis =
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
return appendStateAndRequeueAfter(
context, statusRecorder, terminationState, Duration.ofMillis(requeueAfterMillis));
}
}
List<HasMetadata> resourcesToRemove = new ArrayList<>();
if (ApplicationStateSummary.SchedulingFailure.equals(
currentStatus.getCurrentState().getCurrentStateSummary())) {
// if app failed at scheduling, re-compute all spec and delete as they may not be fully
// owned by driver
try {
resourcesToRemove.addAll(context.getDriverPreResourcesSpec());
resourcesToRemove.add(context.getDriverPodSpec());
resourcesToRemove.addAll(context.getDriverResourcesSpec());
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Failed to build resources for application.", e);
}
ApplicationState updatedState =
new ApplicationState(
ApplicationStateSummary.ResourceReleased,
"Cannot build Spark spec for given application, "
+ "consider all resources as released.");
long requeueAfterMillis =
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
return appendStateAndRequeueAfter(
context, statusRecorder, updatedState, Duration.ofMillis(requeueAfterMillis));
}
} else {
Optional<Pod> driver = context.getDriverPod();
driver.ifPresent(resourcesToRemove::add);
}
boolean forceDelete = enableForceDelete(context.getResource());
for (HasMetadata resource : resourcesToRemove) {
ReconcilerUtils.deleteResourceIfExists(context.getClient(), resource, forceDelete);
}
ApplicationStatus updatedStatus;
if (cleanUpSuccessStateSupplier != null) {
ApplicationState state = cleanUpSuccessStateSupplier.get();
if (StringUtils.isNotEmpty(stateMessage)) {
state.setMessage(stateMessage);
}
long requeueAfterMillis =
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
return appendStateAndRequeueAfter(
context, statusRecorder, state, Duration.ofMillis(requeueAfterMillis));
} else {
updatedStatus =
currentStatus.terminateOrRestart(
tolerations.getRestartConfig(),
tolerations.getResourceRetainPolicy(),
stateMessage,
SparkOperatorConf.TRIM_ATTEMPT_STATE_TRANSITION_HISTORY.getValue());
long requeueAfterMillis =
tolerations.getApplicationTimeoutConfig().getTerminationRequeuePeriodMillis();
if (ApplicationStateSummary.ScheduledToRestart.equals(
updatedStatus.getCurrentState().getCurrentStateSummary())) {
requeueAfterMillis = tolerations.getRestartConfig().getRestartBackoffMillis();
}
return updateStatusAndRequeueAfter(
context, statusRecorder, updatedStatus, Duration.ofMillis(requeueAfterMillis));
}
}