private static StatefulSet buildWorkerStatefulSet()

in spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java [228:294]


  private static StatefulSet buildWorkerStatefulSet(
      String scheduler,
      String name,
      String namespace,
      String version,
      String image,
      int initWorkers,
      String options,
      ObjectMeta metadata,
      StatefulSetSpec statefulSetSpec) {
    var partialStatefulSet =
        new StatefulSetBuilder()
            .withNewMetadataLike(metadata)
            .withName(name + "-worker")
            .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
            .addToLabels(LABEL_SPARK_VERSION_NAME, version)
            .withNamespace(namespace)
            .endMetadata()
            .withNewSpecLike(statefulSetSpec)
            .withPodManagementPolicy("Parallel")
            .withReplicas(initWorkers)
            .withServiceName(name + "-worker-svc")
            .editOrNewSelector()
            .addToMatchLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
            .endSelector()
            .editOrNewTemplate()
            .editOrNewMetadata()
            .addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
            .addToLabels(LABEL_SPARK_VERSION_NAME, version)
            .endMetadata()
            .editOrNewSpec()
            .withSchedulerName(scheduler)
            .withTerminationGracePeriodSeconds(0L)
            .withNewDnsConfig()
            .withSearches(String.format("%s-worker-svc.%s.svc.cluster.local", name, namespace))
            .endDnsConfig();
    if (!partialStatefulSet.hasMatchingContainer(p -> "worker".equals(p.getName()))) {
      partialStatefulSet = partialStatefulSet.addNewContainer().withName("worker").endContainer();
    }
    return partialStatefulSet
        .editMatchingContainer(p -> "worker".equals(p.getName()))
        .withImage(image)
        .addNewEnv()
        .withName("SPARK_LOG_DIR")
        .withValue("/opt/spark/work/logs")
        .endEnv()
        .addNewEnv()
        .withName("SPARK_WORKER_OPTS")
        .withValue(options)
        .endEnv()
        .addToCommand("bash")
        .addToArgs(
            "-c",
            "/opt/spark/sbin/start-worker.sh spark://"
                + name
                + "-master-svc:7077 && while /opt/spark/sbin/spark-daemon.sh status "
                + "org.apache.spark.deploy.worker.Worker 1; do sleep 1; done")
        .addNewPort()
        .withName("web")
        .withContainerPort(8081)
        .endPort()
        .endContainer()
        .endSpec()
        .endTemplate()
        .endSpec()
        .build();
  }