in spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java [228:294]
private static StatefulSet buildWorkerStatefulSet(
String scheduler,
String name,
String namespace,
String version,
String image,
int initWorkers,
String options,
ObjectMeta metadata,
StatefulSetSpec statefulSetSpec) {
var partialStatefulSet =
new StatefulSetBuilder()
.withNewMetadataLike(metadata)
.withName(name + "-worker")
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
.withNamespace(namespace)
.endMetadata()
.withNewSpecLike(statefulSetSpec)
.withPodManagementPolicy("Parallel")
.withReplicas(initWorkers)
.withServiceName(name + "-worker-svc")
.editOrNewSelector()
.addToMatchLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
.endSelector()
.editOrNewTemplate()
.editOrNewMetadata()
.addToLabels(LABEL_SPARK_ROLE_NAME, LABEL_SPARK_ROLE_WORKER_VALUE)
.addToLabels(LABEL_SPARK_VERSION_NAME, version)
.endMetadata()
.editOrNewSpec()
.withSchedulerName(scheduler)
.withTerminationGracePeriodSeconds(0L)
.withNewDnsConfig()
.withSearches(String.format("%s-worker-svc.%s.svc.cluster.local", name, namespace))
.endDnsConfig();
if (!partialStatefulSet.hasMatchingContainer(p -> "worker".equals(p.getName()))) {
partialStatefulSet = partialStatefulSet.addNewContainer().withName("worker").endContainer();
}
return partialStatefulSet
.editMatchingContainer(p -> "worker".equals(p.getName()))
.withImage(image)
.addNewEnv()
.withName("SPARK_LOG_DIR")
.withValue("/opt/spark/work/logs")
.endEnv()
.addNewEnv()
.withName("SPARK_WORKER_OPTS")
.withValue(options)
.endEnv()
.addToCommand("bash")
.addToArgs(
"-c",
"/opt/spark/sbin/start-worker.sh spark://"
+ name
+ "-master-svc:7077 && while /opt/spark/sbin/spark-daemon.sh status "
+ "org.apache.spark.deploy.worker.Worker 1; do sleep 1; done")
.addNewPort()
.withName("web")
.withContainerPort(8081)
.endPort()
.endContainer()
.endSpec()
.endTemplate()
.endSpec()
.build();
}