public ReconcileProgress reconcile()

in spark-operator/src/main/java/org/apache/spark/k8s/operator/reconciler/reconcilesteps/ClusterInitStep.java [47:102]


  public ReconcileProgress reconcile(
      SparkClusterContext context, SparkClusterStatusRecorder statusRecorder) {
    ClusterState currentState = context.getResource().getStatus().getCurrentState();
    if (!currentState.getCurrentStateSummary().isInitializing()) {
      return proceed();
    }
    SparkCluster cluster = context.getResource();
    if (cluster.getStatus().getPreviousAttemptSummary() != null) {
      Instant lastTransitionTime = Instant.parse(currentState.getLastTransitionTime());
      Instant restartTime = lastTransitionTime.plusMillis(300 * 1000);
      Instant now = Instant.now();
      if (restartTime.isAfter(now)) {
        return completeAndRequeueAfter(Duration.between(now, restartTime));
      }
    }
    try {
      Service masterService = context.getMasterServiceSpec();
      context.getClient().services().resource(masterService).create();
      Service workerService = context.getWorkerServiceSpec();
      context.getClient().services().resource(workerService).create();
      StatefulSet masterStatefulSet = context.getMasterStatefulSetSpec();
      context.getClient().apps().statefulSets().resource(masterStatefulSet).create();
      StatefulSet workerStatefulSet = context.getWorkerStatefulSetSpec();
      context.getClient().apps().statefulSets().resource(workerStatefulSet).create();
      var horizontalPodAutoscaler = context.getHorizontalPodAutoscalerSpec();
      if (horizontalPodAutoscaler.isPresent()) {
        context
            .getClient()
            .autoscaling()
            .v2()
            .horizontalPodAutoscalers()
            .resource(horizontalPodAutoscaler.get())
            .create();
      }

      ClusterStatus updatedStatus =
          context
              .getResource()
              .getStatus()
              .appendNewState(new ClusterState(RunningHealthy, CLUSTER_READY_MESSAGE));
      statusRecorder.persistStatus(context, updatedStatus);
      return completeAndDefaultRequeue();
    } catch (Exception e) {
      if (log.isErrorEnabled()) {
        log.error("Failed to request master resource.", e);
      }
      String msg = CLUSTER_SCHEDULE_FAILURE_MESSAGE + " StackTrace: " + buildGeneralErrorMessage(e);
      statusRecorder.persistStatus(
          context,
          context
              .getResource()
              .getStatus()
              .appendNewState(new ClusterState(SchedulingFailure, msg)));
      return completeAndImmediateRequeue();
    }
  }