in dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java [125:229]
private List<String> populateSparkOptions() {
List<String> args = new ArrayList<>();
// see https://spark.apache.org/docs/latest/submitting-applications.html
// TODO remove the option 'local' from deploy-mode
String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode()
: SparkConstants.DEPLOY_MODE_LOCAL;
boolean onLocal = SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode);
boolean onNativeKubernetes = StringUtils.isNotEmpty(sparkParameters.getNamespace());
String masterUrl = StringUtils.isNotEmpty(sparkParameters.getMaster()) ? sparkParameters.getMaster()
: onLocal ? deployMode
: onNativeKubernetes
? SPARK_ON_K8S_MASTER_PREFIX + Config
.fromKubeconfig(
taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml())
.getMasterUrl()
: SparkConstants.SPARK_ON_YARN;
args.add(SparkConstants.MASTER);
args.add(masterUrl);
if (!onLocal) {
args.add(SparkConstants.DEPLOY_MODE);
args.add(deployMode);
}
ProgramType programType = sparkParameters.getProgramType();
String mainClass = sparkParameters.getMainClass();
if (programType != ProgramType.PYTHON && programType != ProgramType.SQL && StringUtils.isNotEmpty(mainClass)) {
args.add(SparkConstants.MAIN_CLASS);
args.add(mainClass);
}
populateSparkResourceDefinitions(args);
String appName = sparkParameters.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(SparkConstants.SPARK_NAME);
args.add(ArgsUtils.escape(appName));
}
String others = sparkParameters.getOthers();
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)
&& (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_YARN_QUEUE))) {
String yarnQueue = sparkParameters.getYarnQueue();
if (StringUtils.isNotEmpty(yarnQueue)) {
args.add(SparkConstants.SPARK_YARN_QUEUE);
args.add(yarnQueue);
}
}
// --conf --files --jars --packages
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
// add driver label for spark on native kubernetes
if (onNativeKubernetes) {
args.add(String.format(DRIVER_LABEL_CONF, UNIQUE_LABEL_NAME, taskExecutionContext.getTaskAppId()));
args.add(String.format(SPARK_KUBERNETES_NAMESPACE,
JSONUtils.toMap(sparkParameters.getNamespace()).get(NAMESPACE_NAME)));
}
ResourceInfo mainJar = sparkParameters.getMainJar();
if (programType != ProgramType.SQL) {
ResourceContext resourceContext = taskExecutionContext.getResourceContext();
args.add(resourceContext.getResourceItem(mainJar.getResourceName()).getResourceAbsolutePathInLocal());
}
String mainArgs = sparkParameters.getMainArgs();
if (programType != ProgramType.SQL && StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
// bin/spark-sql -f fileName
if (ProgramType.SQL == programType) {
String sqlContent = "";
String resourceFileName = "";
args.add(SparkConstants.SQL_FROM_FILE);
if (SparkConstants.TYPE_FILE.equals(sparkParameters.getSqlExecutionType())) {
final List<ResourceInfo> resourceInfos = sparkParameters.getResourceList();
if (resourceInfos.size() > 1) {
log.warn("more than 1 files detected, use the first one by default");
}
try {
resourceFileName = resourceInfos.get(0).getResourceName();
ResourceContext resourceContext = taskExecutionContext.getResourceContext();
sqlContent = FileUtils.readFileToString(
new File(
resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()),
StandardCharsets.UTF_8);
} catch (IOException e) {
log.error("read sql content from file {} error ", resourceFileName, e);
throw new TaskException("read sql content error", e);
}
} else {
sqlContent = sparkParameters.getRawScript();
}
args.add(generateScriptFile(sqlContent));
}
return args;
}