in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java [664:790]
private Tuple2<String, String> getUserJarAndAppConf(
FlinkEnv flinkEnv, FlinkApplication application) {
FlinkDeployMode deployModeEnum = application.getDeployModeEnum();
FlinkApplicationConfig applicationConfig = configService.getEffective(application.getId());
ApiAlertException.throwIfNull(
deployModeEnum, "DeployMode can't be null, start application failed.");
String flinkUserJar = null;
String appConf = null;
switch (application.getJobTypeEnum()) {
case FLINK_SQL:
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
AssertUtils.notNull(flinkSql);
// 1) dist_userJar
String sqlDistJar = ServiceHelper.getFlinkSqlClientJar(flinkEnv);
// 2) appConfig
appConf =
applicationConfig == null
? null
: String.format("yaml://%s", applicationConfig.getContent());
// 3) client
if (FlinkDeployMode.YARN_APPLICATION == deployModeEnum) {
String clientPath = Workspace.remote().APP_CLIENT();
flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
}
break;
case FLINK_CDC:
log.info("the current job id: {}", application.getId());
FlinkSql flinkCDC = flinkSqlService.getEffective(application.getId(), false);
AssertUtils.notNull(flinkCDC);
// 1) dist_userJar
String cdcDistJar = ServiceHelper.getFlinkCDCClientJar(flinkEnv);
// 2) appConfig
appConf =
applicationConfig == null
? null
: String.format("yaml://%s", applicationConfig.getContent());
// 3) client
if (FlinkDeployMode.YARN_APPLICATION == deployModeEnum) {
String clientPath = Workspace.remote().APP_CLIENT();
flinkUserJar = String.format("%s/%s", clientPath, cdcDistJar);
}
break;
case PYFLINK:
Resource resource =
resourceService.findByResourceName(application.getTeamId(), application.getJar());
ApiAlertException.throwIfNull(
resource, "pyflink file can't be null, start application failed.");
ApiAlertException.throwIfNull(
resource.getFilePath(), "pyflink file can't be null, start application failed.");
ApiAlertException.throwIfFalse(
resource.getFilePath().endsWith(Constants.PYTHON_SUFFIX),
"pyflink format error, must be a \".py\" suffix, start application failed.");
flinkUserJar = resource.getFilePath();
break;
case FLINK_JAR:
if (application.isResourceFromUpload()) {
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
} else {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(applicationConfig.getFormat());
if (fileType != null && ConfigFileTypeEnum.UNKNOWN != fileType) {
appConf =
String.format(
"%s://%s", fileType.getTypeName(), applicationConfig.getContent());
} else {
throw new IllegalArgumentException(
"application' config type error,must be ( yaml| properties| hocon )");
}
break;
case APACHE_FLINK:
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
break;
default:
throw new IllegalArgumentException(
"[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
}
}
if (FlinkDeployMode.YARN_APPLICATION == deployModeEnum) {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
flinkUserJar =
String.format(
"%s/%s",
application.getAppLib(),
application.getModule().concat(Constants.JAR_SUFFIX));
break;
case APACHE_FLINK:
flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
if (!FsOperator.hdfs().exists(flinkUserJar)) {
resource =
resourceService.findByResourceName(
application.getTeamId(), application.getJar());
if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
flinkUserJar =
String.format(
"%s/%s",
application.getAppHome(), new File(resource.getFilePath()).getName());
}
}
break;
default:
throw new IllegalArgumentException(
"[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
}
}
break;
}
return Tuple2.of(flinkUserJar, appConf);
}