in spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/observers/AppDriverTimeoutObserver.java [55:88]
public Optional<ApplicationState> observe(
Pod driver, ApplicationSpec spec, ApplicationStatus status) {
long timeoutThreshold;
Supplier<ApplicationState> supplier;
ApplicationTimeoutConfig timeoutConfig =
spec.getApplicationTolerations().getApplicationTimeoutConfig();
ApplicationState state = status.getCurrentState();
switch (state.getCurrentStateSummary()) {
case DriverRequested -> {
timeoutThreshold = timeoutConfig.getDriverStartTimeoutMillis();
supplier = SparkAppStatusUtils::driverLaunchTimedOut;
}
case DriverStarted -> {
timeoutThreshold = timeoutConfig.getDriverReadyTimeoutMillis();
supplier = SparkAppStatusUtils::driverReadyTimedOut;
}
case DriverReady, InitializedBelowThresholdExecutors -> {
timeoutThreshold = timeoutConfig.getExecutorStartTimeoutMillis();
supplier = SparkAppStatusUtils::executorLaunchTimedOut;
}
default -> {
// No timeout check needed for other states
return Optional.empty();
}
}
Instant lastTransitionTime = Instant.parse(state.getLastTransitionTime());
if (timeoutThreshold > 0L
&& lastTransitionTime.plusMillis(timeoutThreshold).isBefore(Instant.now())) {
ApplicationState appState = supplier.get();
appState.setLastObservedDriverStatus(driver.getStatus());
return Optional.of(appState);
}
return Optional.empty();
}