in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java [409:527]
public void start(FlinkApplication appParam, boolean auto) throws Exception {
// For HA purposes, if the task is not processed locally, save the Distribution task and return
if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
distributedTaskService.saveDistributedTask(appParam, auto, DistributedTaskEnum.START);
return;
}
// 1) check application
final FlinkApplication application = getById(appParam.getId());
AssertUtils.notNull(application);
ApiAlertException.throwIfTrue(
!application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly.");
if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())
|| FlinkDeployMode.isSessionMode(application.getDeployModeEnum())) {
checkBeforeStart(application);
}
if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) {
ApiAlertException.throwIfTrue(
!applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
"[StreamPark] The same task name is already running in the yarn queue");
}
ApplicationBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
AssertUtils.notNull(buildPipeline);
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink version");
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
} else {
if (!application.isNeedRestartOnFailed()) {
return;
}
appParam.setRestoreOrTriggerSavepoint(true);
application.setRestartCount(application.getRestartCount() + 1);
}
// 2) update app state to starting...
starting(application);
ApplicationLog applicationLog = constructAppLog(application);
// set the latest to Effective, (it will only become the current effective at this time)
applicationManageService.toEffective(application);
Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isJobTypeFlinkSqlOrCDC()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
flinkSql.setSql(DeflaterUtils.zipString(realSql));
extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null), flinkSql.getSql());
}
Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv, application);
String flinkUserJar = userJarAndAppConf.t1;
String appConf = userJarAndAppConf.t2;
BuildResult buildResult = buildPipeline.getBuildResult();
if (FlinkDeployMode.YARN_APPLICATION == application.getDeployModeEnum()) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
}
// Get the args after placeholder replacement
String args =
StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs();
String applicationArgs = variableService.replaceVariable(application.getTeamId(), args);
Tuple3<String, String, FlinkK8sRestExposedType> clusterIdNamespace =
getNamespaceClusterId(application);
String k8sNamespace = clusterIdNamespace.t1;
String k8sClusterId = clusterIdNamespace.t2;
FlinkK8sRestExposedType exposedType = clusterIdNamespace.t3;
String dynamicProperties =
StringUtils.isBlank(appParam.getDynamicProperties())
? application.getDynamicProperties()
: appParam.getDynamicProperties();
SubmitRequest submitRequest =
new SubmitRequest(
flinkEnv.getFlinkVersion(),
FlinkDeployMode.of(application.getDeployMode()),
getProperties(application, dynamicProperties),
flinkEnv.getFlinkConf(),
FlinkJobType.of(application.getJobType()),
application.getId(),
new JobID().toHexString(),
application.getJobName(),
appConf,
application.getApplicationType(),
getSavepointPath(appParam),
FlinkRestoreMode.of(appParam.getRestoreMode()),
applicationArgs,
k8sClusterId,
application.getHadoopUser(),
buildResult,
extraParameter,
k8sNamespace,
exposedType);
CompletableFuture<SubmitResponse> future =
CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService);
startFutureMap.put(application.getId(), future);
future.whenCompleteAsync(
(response, throwable) -> {
// 1) remove Future
startFutureMap.remove(application.getId());
// 2) exception
if (throwable != null) {
processForException(appParam, throwable, applicationLog, application);
return;
}
// 3) success
processForSuccess(appParam, response, applicationLog, application);
});
}