in spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkClusterResourceSpec.java [57:107]
public SparkClusterResourceSpec(SparkCluster cluster, SparkConf conf) {
String clusterNamespace = cluster.getMetadata().getNamespace();
String clusterName = cluster.getMetadata().getName();
String scheduler = conf.get(Config.KUBERNETES_SCHEDULER_NAME().key(), "default-scheduler");
String namespace = conf.get(Config.KUBERNETES_NAMESPACE().key(), clusterNamespace);
String image = conf.get(Config.CONTAINER_IMAGE().key(), "apache/spark:4.0.0-preview2");
ClusterSpec spec = cluster.getSpec();
String version = spec.getRuntimeVersions().getSparkVersion();
StringBuilder options = new StringBuilder();
for (Tuple2<String, String> t : conf.getAll()) {
options.append(String.format("-D%s=\"%s\" ", t._1, t._2));
}
MasterSpec masterSpec = spec.getMasterSpec();
WorkerSpec workerSpec = spec.getWorkerSpec();
masterService =
buildMasterService(
clusterName,
namespace,
version,
masterSpec.getServiceMetadata(),
masterSpec.getServiceSpec());
workerService =
buildWorkerService(
clusterName,
namespace,
version,
workerSpec.getServiceMetadata(),
workerSpec.getServiceSpec());
masterStatefulSet =
buildMasterStatefulSet(
scheduler,
clusterName,
namespace,
version,
image,
options.toString(),
masterSpec.getStatefulSetMetadata(),
masterSpec.getStatefulSetSpec());
workerStatefulSet =
buildWorkerStatefulSet(
scheduler,
clusterName,
namespace,
version,
image,
spec.getClusterTolerations().getInstanceConfig().getInitWorkers(),
options.toString(),
workerSpec.getStatefulSetMetadata(),
workerSpec.getStatefulSetSpec());
horizontalPodAutoscaler = buildHorizontalPodAutoscaler(clusterName, namespace, spec);
}