in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java [442:546]
private Tuple2<String, String> getUserJarAndAppConf(
SparkEnv sparkEnv, SparkApplication application) {
SparkDeployMode deployModeEnum = application.getDeployModeEnum();
SparkApplicationConfig applicationConfig = configService.getEffective(application.getId());
ApiAlertException.throwIfNull(
deployModeEnum, "DeployMode can't be null, start application failed.");
String sparkUserJar = null;
String appConf = null;
switch (application.getJobTypeEnum()) {
case SPARK_SQL:
SparkSql sparkSql = sparkSqlService.getEffective(application.getId(), false);
AssertUtils.notNull(sparkSql);
// 1) dist_userJar
String sqlDistJar = ServiceHelper.getSparkSqlClientJar(sparkEnv);
// 2) appConfig
appConf = applicationConfig == null
? null
: String.format("yaml://%s", applicationConfig.getContent());
// 3) client
if (SparkDeployMode.YARN_CLUSTER == deployModeEnum) {
String clientPath = Workspace.remote().APP_CLIENT();
sparkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
}
break;
case PYSPARK:
appConf = applicationConfig == null
? null
: String.format("yaml://%s", applicationConfig.getContent());
Resource resource = resourceService.findByResourceName(application.getTeamId(), application.getJar());
ApiAlertException.throwIfNull(
resource, "pyflink file can't be null, start application failed.");
ApiAlertException.throwIfNull(
resource.getFilePath(), "pyflink file can't be null, start application failed.");
ApiAlertException.throwIfFalse(
resource.getFilePath().endsWith(Constants.PYTHON_SUFFIX),
"pyflink format error, must be a \".py\" suffix, start application failed.");
sparkUserJar = resource.getFilePath();
break;
case SPARK_JAR:
if (application.isFromUploadJob()) {
appConf = applicationConfig == null
? null
: String.format("yaml://%s", applicationConfig.getContent());
} else {
switch (application.getApplicationType()) {
case STREAMPARK_SPARK:
ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(applicationConfig.getFormat());
if (fileType != null && ConfigFileTypeEnum.UNKNOWN != fileType) {
appConf = String.format(
"%s://%s", fileType.getTypeName(), applicationConfig.getContent());
} else {
throw new IllegalArgumentException(
"application' config type error,must be ( yaml| properties| hocon )");
}
break;
case APACHE_SPARK:
appConf = String.format(
"json://{\"%s\":\"%s\"}",
ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
break;
default:
throw new IllegalArgumentException(
"[StreamPark] ApplicationType must be (StreamPark spark | Apache spark)... ");
}
}
if (SparkDeployMode.isYarnMode(deployModeEnum)) {
switch (application.getApplicationType()) {
case STREAMPARK_SPARK:
sparkUserJar = String.format(
"%s/%s",
application.getAppLib(),
application.getModule().concat(Constants.JAR_SUFFIX));
break;
case APACHE_SPARK:
sparkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
if (!FsOperator.hdfs().exists(sparkUserJar)) {
resource = resourceService.findByResourceName(
application.getTeamId(), application.getJar());
if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
sparkUserJar = String.format(
"%s/%s",
application.getAppHome(),
new File(resource.getFilePath()).getName());
}
}
break;
default:
throw new IllegalArgumentException(
"[StreamPark] ApplicationType must be (StreamPark spark | Apache spark)... ");
}
}
break;
}
return Tuple2.of(sparkUserJar, appConf);
}