in client/migrationx/migrationx-transformer/src/main/java/com/aliyun/dataworks/migrationx/transformer/dataworks/converter/dolphinscheduler/v3/workflow/parameters/SparkParameterConverter.java [100:200]
private List<String> populateSparkOptions(SparkParameters sparkParameters) {
List<String> args = new ArrayList<>();
ProgramType programType = sparkParameters.getProgramType();
//code
if (ProgramType.SQL == programType) {
if (SparkConstants.TYPE_SCRIPT.equals(sparkParameters.getSqlExecutionType())) {
String sqlContent = sparkParameters.getRawScript();
args.add(sqlContent);
return args;
}
}
//resource file
ResourceInfo mainJar = sparkParameters.getMainJar();
if (programType != ProgramType.SQL) {
String resource = mainJar.getResourceName();
if (StringUtils.isEmpty(resource)) {
resource = getResourceNameById(mainJar.getId());
}
if (resource != null) {
String[] resources = resource.split("/");
if (resources.length > 0) {
resource = resources[resources.length - 1];
}
mainJar.setResourceName(resource);
String dwResource = "##@resource_reference{\"" + resource + "\"} \n";
args.add(dwResource + SparkConstants.SPARK_SUBMIT_COMMAND);
} else {
args.add(SparkConstants.SPARK_SUBMIT_COMMAND);
}
} else {
args.add(SparkConstants.SPARK_SUBMIT_COMMAND);
}
String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode()
: SparkConstants.DEPLOY_MODE_LOCAL;
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)) {
args.add(SparkConstants.MASTER);
String masterUrl = SparkConstants.SPARK_ON_YARN;
args.add(masterUrl);
}
args.add(SparkConstants.DEPLOY_MODE);
args.add(deployMode);
String mainClass = sparkParameters.getMainClass();
if (programType != ProgramType.PYTHON && programType != ProgramType.SQL && StringUtils.isNotEmpty(mainClass)) {
args.add(SparkConstants.MAIN_CLASS);
args.add(mainClass);
}
populateSparkResourceDefinitions(args, sparkParameters);
String appName = sparkParameters.getAppName();
if (StringUtils.isNotEmpty(appName)) {
args.add(SparkConstants.SPARK_NAME);
args.add(ArgsUtils.escape(appName));
}
String others = sparkParameters.getOthers();
if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)
&& (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_YARN_QUEUE))) {
String yarnQueue = sparkParameters.getYarnQueue();
if (StringUtils.isNotEmpty(yarnQueue)) {
args.add(SparkConstants.SPARK_YARN_QUEUE);
args.add(yarnQueue);
}
}
// --conf --files --jars --packages
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
//jar
if (programType != ProgramType.SQL && mainJar != null) {
args.add(mainJar.getResourceName());
}
String mainArgs = sparkParameters.getMainArgs();
if (programType != ProgramType.SQL && StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
// bin/spark-sql -f fileName
if (ProgramType.SQL == programType) {
String resourceFileName = "";
args.add(SparkConstants.SQL_FROM_FILE);
if (SparkConstants.TYPE_FILE.equals(sparkParameters.getSqlExecutionType())) {
final List<ResourceInfo> resourceInfos = sparkParameters.getResourceList();
if (resourceInfos.size() > 1) {
log.warn("more than 1 files detected, use the first one by default");
}
resourceFileName = resourceInfos.get(0).getResourceName();
}
args.add(resourceFileName);
}
return args;
}