in spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java [47:102]
public ReconcileProgress reconcile(
SparkClusterContext context, SparkClusterStatusRecorder statusRecorder) {
ClusterState currentState = context.getResource().getStatus().getCurrentState();
if (!currentState.getCurrentStateSummary().isInitializing()) {
return proceed();
}
SparkCluster cluster = context.getResource();
if (cluster.getStatus().getPreviousAttemptSummary() != null) {
Instant lastTransitionTime = Instant.parse(currentState.getLastTransitionTime());
Instant restartTime = lastTransitionTime.plusMillis(300 * 1000);
Instant now = Instant.now();
if (restartTime.isAfter(now)) {
return completeAndRequeueAfter(Duration.between(now, restartTime));
}
}
try {
Service masterService = context.getMasterServiceSpec();
context.getClient().services().resource(masterService).create();
Service workerService = context.getWorkerServiceSpec();
context.getClient().services().resource(workerService).create();
StatefulSet masterStatefulSet = context.getMasterStatefulSetSpec();
context.getClient().apps().statefulSets().resource(masterStatefulSet).create();
StatefulSet workerStatefulSet = context.getWorkerStatefulSetSpec();
context.getClient().apps().statefulSets().resource(workerStatefulSet).create();
var horizontalPodAutoscaler = context.getHorizontalPodAutoscalerSpec();
if (horizontalPodAutoscaler.isPresent()) {
context
.getClient()
.autoscaling()
.v2()
.horizontalPodAutoscalers()
.resource(horizontalPodAutoscaler.get())
.create();
}
ClusterStatus updatedStatus =
context
.getResource()
.getStatus()
.appendNewState(new ClusterState(RunningHealthy, CLUSTER_READY_MESSAGE));
statusRecorder.persistStatus(context, updatedStatus);
return completeAndDefaultRequeue();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Failed to request master resource.", e);
}
String msg = CLUSTER_SCHEDULE_FAILURE_MESSAGE + " StackTrace: " + buildGeneralErrorMessage(e);
statusRecorder.persistStatus(
context,
context
.getResource()
.getStatus()
.appendNewState(new ClusterState(SchedulingFailure, msg)));
return completeAndImmediateRequeue();
}
}