in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java [274:409]
public void start(SparkApplication appParam, boolean auto) throws Exception {
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.START);
return;
}
// 1) check application
final SparkApplication application = getById(appParam.getId());
AssertUtils.notNull(application);
ApiAlertException.throwIfTrue(
!application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly.");
SparkEnv sparkEnv = sparkEnvService.getByIdOrDefault(application.getVersionId());
ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found spark version");
if (SparkDeployMode.isYarnMode(application.getDeployModeEnum())) {
checkYarnBeforeStart(application);
}
ApplicationBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
AssertUtils.notNull(buildPipeline);
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
} else {
if (!application.isNeedRestartOnFailed()) {
return;
}
application.setRestartCount(application.getRestartCount() + 1);
}
// 2) update app state to starting...
starting(application);
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setJobType(EngineTypeEnum.SPARK.getCode());
applicationLog.setOptionName(SparkOperationEnum.START.getValue());
applicationLog.setAppId(application.getId());
applicationLog.setCreateTime(new Date());
applicationLog.setUserId(ServiceHelper.getUserId());
// set the latest to Effective, (it will only become the current effective at this time)
// applicationManageService.toEffective(application);
Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isSparkSqlJob()) {
SparkSql sparkSql = sparkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
String realSql = variableService.replaceVariable(application.getTeamId(), sparkSql.getSql());
sparkSql.setSql(DeflaterUtils.zipString(realSql));
extraParameter.put(ConfigKeys.KEY_SPARK_SQL(null), sparkSql.getSql());
}
Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(sparkEnv, application);
String sparkUserJar = userJarAndAppConf.f0;
String appConf = userJarAndAppConf.f1;
BuildResult buildResult = buildPipeline.getBuildResult();
if (SparkDeployMode.isYarnMode(application.getDeployModeEnum())) {
buildResult = new ShadedBuildResponse(null, sparkUserJar, true);
if (StringUtils.isNotBlank(application.getYarnQueueName())) {
extraParameter.put(ConfigKeys.KEY_SPARK_YARN_QUEUE_NAME(), application.getYarnQueueName());
}
if (StringUtils.isNotBlank(application.getYarnQueueLabel())) {
extraParameter.put(ConfigKeys.KEY_SPARK_YARN_QUEUE_LABEL(), application.getYarnQueueLabel());
}
}
// Get the args after placeholder replacement
String applicationArgs = variableService.replaceVariable(application.getTeamId(), application.getAppArgs());
SubmitRequest submitRequest = new SubmitRequest(
sparkEnv.getSparkVersion(),
SparkDeployMode.of(application.getDeployMode()),
sparkEnv.getSparkConf(),
SparkJobType.valueOf(application.getJobType()),
application.getId(),
application.getAppName(),
application.getMainClass(),
appConf,
PropertiesUtils.extractSparkPropertiesAsJava(application.getAppProperties()),
PropertiesUtils.extractSparkArgumentsAsJava(applicationArgs),
application.getApplicationType(),
application.getHadoopUser(),
buildResult,
extraParameter);
CompletableFuture<SubmitResponse> future = CompletableFuture
.supplyAsync(() -> SparkClient.submit(submitRequest), executorService);
startJobFutureMap.put(application.getId(), future);
future.whenComplete(
(response, throwable) -> {
// 1) remove Future
startJobFutureMap.remove(application.getId());
// 2) exception
if (throwable != null) {
String exception = ExceptionUtils.stringifyException(throwable);
applicationLog.setException(exception);
applicationLog.setSuccess(false);
applicationLogService.save(applicationLog);
if (throwable instanceof CancellationException) {
doStopped(application.getId());
} else {
SparkApplication app = getById(appParam.getId());
app.setState(SparkAppStateEnum.FAILED.getValue());
app.setOptionState(SparkOptionStateEnum.NONE.getValue());
updateById(app);
SparkAppHttpWatcher.unWatching(appParam.getId());
}
return;
}
// 3) success
applicationLog.setSuccess(true);
application.resolveScheduleConf(response.sparkProperties());
if (StringUtils.isNoneEmpty(response.sparkAppId())) {
application.setClusterId(response.sparkAppId());
}
applicationLog.setClusterId(response.sparkAppId());
applicationLog.setTrackingUrl(response.trackingUrl());
application.setStartTime(new Date());
application.setEndTime(null);
// if start completed, will be added task to tracking queue
SparkAppHttpWatcher.setOptionState(appParam.getId(), SparkOptionStateEnum.STARTING);
SparkAppHttpWatcher.doWatching(application);
// update app
updateById(application);
// save log
applicationLogService.save(applicationLog);
});
}