in dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkArgsUtils.java [182:303]
private static List<String> buildRunCommandLineForOthers(TaskExecutionContext taskExecutionContext,
FlinkParameters flinkParameters) {
List<String> args = new ArrayList<>();
args.add(FlinkConstants.FLINK_COMMAND);
FlinkDeployMode deployMode = Optional.ofNullable(flinkParameters.getDeployMode()).orElse(DEFAULT_DEPLOY_MODE);
String flinkVersion = flinkParameters.getFlinkVersion();
// build run command
switch (deployMode) {
case CLUSTER:
if (FLINK_VERSION_AFTER_OR_EQUALS_1_12.equals(flinkVersion)
|| FLINK_VERSION_AFTER_OR_EQUALS_1_13.equals(flinkVersion)) {
args.add(FlinkConstants.FLINK_RUN); // run
args.add(FlinkConstants.FLINK_EXECUTION_TARGET); // -t
args.add(FlinkConstants.FLINK_YARN_PER_JOB); // yarn-per-job
} else {
args.add(FlinkConstants.FLINK_RUN); // run
args.add(FlinkConstants.FLINK_RUN_MODE); // -m
args.add(FlinkConstants.FLINK_YARN_CLUSTER); // yarn-cluster
}
break;
case APPLICATION:
args.add(FlinkConstants.FLINK_RUN_APPLICATION); // run-application
args.add(FlinkConstants.FLINK_EXECUTION_TARGET); // -t
args.add(FlinkConstants.FLINK_YARN_APPLICATION); // yarn-application
break;
case LOCAL:
args.add(FlinkConstants.FLINK_RUN); // run
break;
case STANDALONE:
args.add(FlinkConstants.FLINK_RUN); // run
break;
}
String others = flinkParameters.getOthers();
// build args
switch (deployMode) {
case CLUSTER:
case APPLICATION:
int slot = flinkParameters.getSlot();
if (slot > 0) {
args.add(FlinkConstants.FLINK_YARN_SLOT);
args.add(String.format("%d", slot)); // -ys
}
String appName = flinkParameters.getAppName();
if (StringUtils.isNotEmpty(appName)) { // -ynm
args.add(FlinkConstants.FLINK_APP_NAME);
args.add(ArgsUtils.escape(appName));
}
// judge flink version, the parameter -yn has removed from flink 1.10
if (flinkVersion == null || FLINK_VERSION_BEFORE_1_10.equals(flinkVersion)) {
int taskManager = flinkParameters.getTaskManager();
if (taskManager > 0) { // -yn
args.add(FlinkConstants.FLINK_TASK_MANAGE);
args.add(String.format("%d", taskManager));
}
}
String jobManagerMemory = flinkParameters.getJobManagerMemory();
if (StringUtils.isNotEmpty(jobManagerMemory)) {
args.add(FlinkConstants.FLINK_JOB_MANAGE_MEM);
args.add(jobManagerMemory); // -yjm
}
String taskManagerMemory = flinkParameters.getTaskManagerMemory();
if (StringUtils.isNotEmpty(taskManagerMemory)) { // -ytm
args.add(FlinkConstants.FLINK_TASK_MANAGE_MEM);
args.add(taskManagerMemory);
}
break;
case LOCAL:
break;
case STANDALONE:
break;
}
int parallelism = flinkParameters.getParallelism();
if (parallelism > 0) {
args.add(FlinkConstants.FLINK_PARALLELISM);
args.add(String.format("%d", parallelism)); // -p
}
// If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated
// abruptly
// The task status will be synchronized with the cluster job status
args.add(FlinkConstants.FLINK_SHUTDOWN_ON_ATTACHED_EXIT); // -sae
// -s -yqu -yat -yD -D
if (StringUtils.isNotEmpty(others)) {
args.add(others);
}
// determine yarn queue
determinedYarnQueue(args, flinkParameters, deployMode, flinkVersion);
ProgramType programType = flinkParameters.getProgramType();
String mainClass = flinkParameters.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
args.add(FlinkConstants.FLINK_MAIN_CLASS); // -c
args.add(flinkParameters.getMainClass()); // main class
}
ResourceInfo mainJar = flinkParameters.getMainJar();
if (mainJar != null) {
// -py
if (ProgramType.PYTHON == programType) {
args.add(FlinkConstants.FLINK_PYTHON);
}
ResourceContext resourceContext = taskExecutionContext.getResourceContext();
args.add(resourceContext.getResourceItem(mainJar.getResourceName()).getResourceAbsolutePathInLocal());
}
String mainArgs = flinkParameters.getMainArgs();
if (StringUtils.isNotEmpty(mainArgs)) {
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
args.add(ParameterUtils.convertParameterPlaceholders(mainArgs, ParameterUtils.convert(paramsMap)));
}
return args;
}