in client/migrationx/migrationx-transformer/src/main/java/com/aliyun/dataworks/migrationx/transformer/dataworks/converter/dolphinscheduler/v3/nodes/parameters/SparkParameterConverter.java [85:161]
private List<String> populateSparkOptions(String codeType, Collection<String> resourceNames) {
List<String> args = new ArrayList<>();
SparkParameters sparkParameters = this.parameter;
ProgramType programType = sparkParameters.getProgramType();
ResourceInfo mainJar = sparkParameters.getMainJar();
if (programType != ProgramType.SQL) {
String resourceName = mainJar.getResourceName();
if (StringUtils.isEmpty(resourceName)) {
resourceName = getResourceNameById(mainJar.getId());
}
if (resourceName != null) {
resourceNames.add(resourceName);
mainJar.setResourceName(resourceName);
//String dwResource = "##@resource_reference{\"" + resourceName + "\"} \n";
args.add(SparkConstants.SPARK_SUBMIT_COMMAND);
} else {
args.add(SparkConstants.SPARK_SUBMIT_COMMAND);
}
} else {
args.add(SparkConstants.SPARK_SUBMIT_COMMAND);
}
String ref = DataStudioCodeUtils.addResourceReference(CodeProgramType.of(codeType), "", resourceNames);
args.add(0, ref);
String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode()
: SparkConstants.DEPLOY_MODE_LOCAL;
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
args.add(SparkConstants.MASTER);
String masterUrl = SparkConstants.SPARK_ON_YARN;
args.add(masterUrl);
}
args.add(SparkConstants.DEPLOY_MODE);
args.add(deployMode);
String mainClass = sparkParameters.getMainClass();
if (programType != ProgramType.PYTHON && programType != ProgramType.SQL && StringUtils.isNotEmpty(mainClass)) {
args.add(SparkConstants.MAIN_CLASS);
args.add(mainClass);
}
populateSparkResourceDefinitions(args, sparkParameters);
String appName = sparkParameters.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(SparkConstants.SPARK_NAME);
args.add(ArgsUtils.escape(appName));
}
String others = sparkParameters.getOthers();
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)
&& (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_YARN_QUEUE))) {
String yarnQueue = sparkParameters.getYarnQueue();
if (StringUtils.isNotEmpty(yarnQueue)) {
args.add(SparkConstants.SPARK_YARN_QUEUE);
args.add(yarnQueue);
}
}
// --conf --files --jars --packages
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
//jar
if (programType != ProgramType.SQL && mainJar != null) {
args.add(mainJar.getResourceName());
}
String mainArgs = sparkParameters.getMainArgs();
if (programType != ProgramType.SQL && StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
return args;
}