in spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/AppReconcileStep.java [42:73]
public abstract ReconcileProgress reconcile(
SparkAppContext context, SparkAppStatusRecorder statusRecorder);
protected ReconcileProgress observeDriver(
final SparkAppContext context,
final SparkAppStatusRecorder statusRecorder,
final List<BaseAppDriverObserver> observers) {
Optional<Pod> driverPodOptional = context.getDriverPod();
SparkApplication app = context.getResource();
ApplicationStatus currentStatus = app.getStatus();
if (driverPodOptional.isPresent()) {
List<ApplicationState> stateUpdates =
observers.stream()
.map(o -> o.observe(driverPodOptional.get(), app.getSpec(), app.getStatus()))
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
if (stateUpdates.isEmpty()) {
return proceed();
} else {
for (ApplicationState state : stateUpdates) {
currentStatus = currentStatus.appendNewState(state);
}
statusRecorder.persistStatus(context, currentStatus);
return completeAndImmediateRequeue();
}
} else {
ApplicationStatus updatedStatus = currentStatus.appendNewState(driverUnexpectedRemoved());
statusRecorder.persistStatus(context, updatedStatus);
return completeAndImmediateRequeue();
}
}