in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java [1402:1649]
public void start(Application appParam, boolean auto) throws Exception {
final Application application = getById(appParam.getId());
Utils.notNull(application);
if (!application.isCanBeStart()) {
throw new ApiAlertException("[StreamPark] The application cannot be started repeatedly.");
}
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
if (flinkEnv == null) {
throw new ApiAlertException("[StreamPark] can no found flink version");
}
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
} else {
if (!application.isNeedRestartOnFailed()) {
return;
}
appParam.setSavePointed(true);
application.setRestartCount(application.getRestartCount() + 1);
}
starting(application);
application.setAllowNonRestored(appParam.getAllowNonRestored());
String appConf;
String flinkUserJar = null;
String jobId = new JobID().toHexString();
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setOptionName(Operation.START.getValue());
applicationLog.setAppId(application.getId());
applicationLog.setOptionTime(new Date());
// set the latest to Effective, (it will only become the current effective at this time)
this.toEffective(application);
ApplicationConfig applicationConfig = configService.getEffective(application.getId());
ExecutionMode executionMode = ExecutionMode.of(application.getExecutionMode());
ApiAlertException.throwIfNull(
executionMode, "ExecutionMode can't be null, start application failed.");
if (application.isCustomCodeJob()) {
if (application.isUploadJob()) {
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
} else {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
ConfigFileType fileType = ConfigFileType.of(applicationConfig.getFormat());
if (fileType != null && !fileType.equals(ConfigFileType.UNKNOWN)) {
appConf =
String.format("%s://%s", fileType.getTypeName(), applicationConfig.getContent());
} else {
throw new IllegalArgumentException(
"application' config type error,must be ( yaml| properties| hocon )");
}
break;
case APACHE_FLINK:
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
break;
default:
throw new IllegalArgumentException(
"[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
}
}
if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
flinkUserJar =
String.format(
"%s/%s", application.getAppLib(), application.getModule().concat(".jar"));
break;
case APACHE_FLINK:
flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
break;
default:
throw new IllegalArgumentException(
"[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
}
}
} else if (application.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
Utils.notNull(flinkSql);
// 1) dist_userJar
String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
// 2) appConfig
appConf =
applicationConfig == null
? null
: String.format("yaml://%s", applicationConfig.getContent());
// 3) client
if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
String clientPath = Workspace.remote().APP_CLIENT();
flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
}
} else {
throw new UnsupportedOperationException("Unsupported...");
}
Map<String, Object> extraParameter = new HashMap<>(0);
if (application.isFlinkSqlJob()) {
FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
// Get the sql of the replaced placeholder
String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
flinkSql.setSql(DeflaterUtils.zipString(realSql));
extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
}
KubernetesSubmitParam kubernetesSubmitParam =
new KubernetesSubmitParam(
application.getClusterId(),
application.getK8sNamespace(),
application.getK8sRestExposedTypeEnum());
AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
Utils.notNull(buildPipeline);
BuildResult buildResult = buildPipeline.getBuildResult();
if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
}
// Get the args after placeholder replacement
String applicationArgs =
variableService.replaceVariable(application.getTeamId(), application.getArgs());
SubmitRequest submitRequest =
new SubmitRequest(
flinkEnv.getFlinkVersion(),
ExecutionMode.of(application.getExecutionMode()),
getProperties(application),
flinkEnv.getFlinkConf(),
DevelopmentMode.of(application.getJobType()),
application.getId(),
jobId,
application.getJobName(),
appConf,
application.getApplicationType(),
getSavePointed(appParam),
appParam.getRestoreMode() == null ? null : RestoreMode.of(appParam.getRestoreMode()),
applicationArgs,
buildResult,
kubernetesSubmitParam,
extraParameter);
CompletableFuture<SubmitResponse> future =
CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService);
startFutureMap.put(application.getId(), future);
CompletableFutureUtils.runTimeout(
future,
2L,
TimeUnit.MINUTES,
submitResponse -> {
if (submitResponse.flinkConfig() != null) {
String jmMemory =
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
if (jmMemory != null) {
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
}
String tmMemory =
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_TM_PROCESS_MEMORY());
if (tmMemory != null) {
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
}
}
application.setAppId(submitResponse.clusterId());
if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
application.setJobId(submitResponse.jobId());
}
if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
application.setJobManagerUrl(submitResponse.jobManagerUrl());
applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
}
applicationLog.setYarnAppId(submitResponse.clusterId());
application.setStartTime(new Date());
application.setEndTime(null);
if (isKubernetesApp(application)) {
application.setRelease(ReleaseState.DONE.get());
}
updateById(application);
// if start completed, will be added task to tracking queue
if (isKubernetesApp(application)) {
k8SFlinkTrackMonitor.doWatching(toTrackId(application));
} else {
FlinkHttpWatcher.setOptionState(appParam.getId(), OptionState.STARTING);
FlinkHttpWatcher.doWatching(application);
}
applicationLog.setSuccess(true);
// set savepoint to expire
savePointService.expire(application.getId());
},
e -> {
if (e.getCause() instanceof CancellationException) {
updateToStopped(application);
} else {
String exception = Utils.stringifyException(e);
applicationLog.setException(exception);
applicationLog.setSuccess(false);
Application app = getById(appParam.getId());
app.setState(FlinkAppState.FAILED.getValue());
app.setOptionState(OptionState.NONE.getValue());
updateById(app);
if (isKubernetesApp(app)) {
k8SFlinkTrackMonitor.unWatching(toTrackId(app));
} else {
FlinkHttpWatcher.unWatching(appParam.getId());
}
}
})
.whenComplete(
(t, e) -> {
if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String domainName = settingService.getIngressModeDefault();
if (StringUtils.isNotBlank(domainName)) {
try {
IngressController.configureIngress(
domainName, application.getClusterId(), application.getK8sNamespace());
} catch (KubernetesClientException kubernetesClientException) {
log.info(
"Failed to create ingress, stack info:{}",
kubernetesClientException.getMessage());
applicationLog.setException(e.getMessage());
applicationLog.setSuccess(false);
applicationLogService.save(applicationLog);
application.setState(FlinkAppState.FAILED.getValue());
application.setOptionState(OptionState.NONE.getValue());
updateById(application);
return;
}
}
}
applicationLogService.save(applicationLog);
startFutureMap.remove(application.getId());
});
}