in client/migrationx/migrationx-transformer/src/main/java/com/aliyun/dataworks/migrationx/transformer/dataworks/converter/dolphinscheduler/v3/workflow/parameters/FlinkParameterConverter.java [205:327]
private List<String> buildRunCommandLineForOthers(FlinkParameters flinkParameters) {
List<String> args = new ArrayList<>();
args.add(FlinkConstants.FLINK_COMMAND);
FlinkDeployMode deployMode = Optional.ofNullable(flinkParameters.getDeployMode()).orElse(DEFAULT_DEPLOY_MODE);
String flinkVersion = flinkParameters.getFlinkVersion();
// build run command
switch (deployMode) {
case CLUSTER:
if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)
|| FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
args.add(FlinkConstants.FLINK_RUN); // run
args.add(FlinkConstants.FLINK_EXECUTION_TARGET); // -t
args.add(FlinkConstants.FLINK_YARN_PER_JOB); // yarn-per-job
} else {
args.add(FlinkConstants.FLINK_RUN); // run
args.add(FlinkConstants.FLINK_RUN_MODE); // -m
args.add(FlinkConstants.FLINK_YARN_CLUSTER); // yarn-cluster
}
break;
case APPLICATION:
args.add(FlinkConstants.FLINK_RUN_APPLICATION); // run-application
args.add(FlinkConstants.FLINK_EXECUTION_TARGET); // -t
args.add(FlinkConstants.FLINK_YARN_APPLICATION); // yarn-application
break;
case LOCAL:
args.add(FlinkConstants.FLINK_RUN); // run
break;
case STANDALONE:
args.add(FlinkConstants.FLINK_RUN); // run
break;
}
String others = flinkParameters.getOthers();
// build args
switch (deployMode) {
case CLUSTER:
case APPLICATION:
int slot = flinkParameters.getSlot();
if (slot > 0) {
args.add(FlinkConstants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot)); // -ys
}
String appName = flinkParameters.getAppName();
if (StringUtils.isNotEmpty(appName)) { // -ynm
args.add(FlinkConstants.FLINK_APP_NAME);
args.add(ArgsUtils.escape(appName));
}
// judge flink version, the parameter -yn has removed from flink 1.10
if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = flinkParameters.getTaskManager();
if (taskManager > 0) { // -yn
args.add(FlinkConstants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
}
String jobManagerMemory = flinkParameters.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
args.add(jobManagerMemory); // -yjm
}
String taskManagerMemory = flinkParameters.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
args.add(taskManagerMemory);
}
break;
case LOCAL:
break;
case STANDALONE:
break;
}
int parallelism = flinkParameters.getParallelism();
if (parallelism > 0) {
args.add(FlinkConstants.FLINK_PARALLELISM);
args.add(String.format("%d", parallelism)); // -p
}
// If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated
// abruptly
// The task status will be synchronized with the cluster job status
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
// -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
// determine yarn queue
determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
ProgramType programType = flinkParameters.getProgramType();
String mainClass = flinkParameters.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(FlinkConstants.FLINK_MAIN_CLASS); // -c
args.add(flinkParameters.getMainClass()); // main class
}
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
// -py
if (ProgramType.PYTHON == programType) {
args.add(FlinkConstants.FLINK_PYTHON);
}
String name = mainJar.getResourceName();
if (name == null && mainJar.getId() != null) {
name = getResourceNameById(mainJar.getId());
}
args.add(name);
}
String mainArgs = flinkParameters.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
args.add(mainArgs);
}
return args;
}