in spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppInitStep.java [52:126]
public ReconcileProgress reconcile(
SparkAppContext context, SparkAppStatusRecorder statusRecorder) {
ApplicationState currentState = context.getResource().getStatus().getCurrentState();
if (!currentState.getCurrentStateSummary().isInitializing()) {
return proceed();
}
SparkApplication app = context.getResource();
if (app.getStatus().getPreviousAttemptSummary() != null) {
Instant lastTransitionTime = Instant.parse(currentState.getLastTransitionTime());
Instant restartTime =
lastTransitionTime.plusMillis(
app.getSpec()
.getApplicationTolerations()
.getRestartConfig()
.getRestartBackoffMillis());
Instant now = Instant.now();
if (restartTime.isAfter(now)) {
return ReconcileProgress.completeAndRequeueAfter(Duration.between(now, restartTime));
}
}
try {
List<HasMetadata> createdPreResources = new ArrayList<>();
for (HasMetadata resource : context.getDriverPreResourcesSpec()) {
Optional<HasMetadata> createdResource =
ReconcilerUtils.getOrCreateSecondaryResource(context.getClient(), resource);
if (createdResource.isPresent()) {
createdPreResources.add(createdResource.get());
} else {
return appendStateAndImmediateRequeue(
context, statusRecorder, creationFailureState(resource));
}
}
Optional<Pod> driverPod =
ReconcilerUtils.getOrCreateSecondaryResource(
context.getClient(), context.getDriverPodSpec());
if (driverPod.isPresent()) {
DriverResourceDecorator decorator = new DriverResourceDecorator(driverPod.get());
createdPreResources.forEach(decorator::decorate);
context.getClient().resourceList(createdPreResources).forceConflicts().serverSideApply();
List<HasMetadata> driverResources = context.getDriverResourcesSpec();
driverResources.forEach(decorator::decorate);
for (HasMetadata resource : driverResources) {
Optional<HasMetadata> createdResource =
ReconcilerUtils.getOrCreateSecondaryResource(context.getClient(), resource);
if (createdResource.isEmpty()) {
return appendStateAndImmediateRequeue(
context, statusRecorder, creationFailureState(resource));
}
}
}
ApplicationStatus updatedStatus =
context
.getResource()
.getStatus()
.appendNewState(
new ApplicationState(
ApplicationStateSummary.DriverRequested, Constants.DRIVER_REQUESTED_MESSAGE));
statusRecorder.persistStatus(context, updatedStatus);
return completeAndDefaultRequeue();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Failed to request driver resource.", e);
}
String errorMessage =
Constants.SCHEDULE_FAILURE_MESSAGE + " StackTrace: " + buildGeneralErrorMessage(e);
statusRecorder.persistStatus(
context,
context
.getResource()
.getStatus()
.appendNewState(
new ApplicationState(ApplicationStateSummary.SchedulingFailure, errorMessage)));
return completeAndImmediateRequeue();
}
}