in spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java [95:134]
protected SparkAppDriverConf buildDriverConf(
SparkApplication app, Map<String, String> confOverrides) {
ApplicationSpec applicationSpec = app.getSpec();
SparkConf effectiveSparkConf = new SparkConf();
if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) {
for (String confKey : applicationSpec.getSparkConf().keySet()) {
effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey));
}
}
if (MapUtils.isNotEmpty(confOverrides)) {
for (Map.Entry<String, String> entry : confOverrides.entrySet()) {
effectiveSparkConf.set(entry.getKey(), entry.getValue());
}
}
effectiveSparkConf.set("spark.kubernetes.namespace", app.getMetadata().getNamespace());
MainAppResource primaryResource = new JavaMainAppResource(Option.empty());
if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars()));
effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles());
effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles());
} else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) {
primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles());
}
String sparkMasterUrlPrefix =
effectiveSparkConf.get(MASTER_URL_PREFIX_PROPS_NAME, DEFAULT_MASTER_URL_PREFIX);
effectiveSparkConf.setIfMissing(
"spark.master",
sparkMasterUrlPrefix + "https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT");
String appId = generateSparkAppId(app);
effectiveSparkConf.setIfMissing("spark.app.id", appId);
return SparkAppDriverConf.create(
effectiveSparkConf,
appId,
primaryResource,
applicationSpec.getMainClass(),
applicationSpec.getDriverArgs().toArray(String[]::new),
Option.apply(applicationSpec.getProxyUser()));
}