in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java [178:389]
public boolean buildApplication(@Nonnull Long appId, boolean forceBuild) {
// check the build environment
checkBuildEnv(appId, forceBuild);
FlinkApplication app = applicationManageService.getById(appId);
ApplicationLog applicationLog = getApplicationLog(app);
// check if you need to go through the build process (if the jar and pom have changed,
// you need to go through the build process, if other common parameters are modified,
// you don't need to go through the build process)
boolean needBuild = applicationManageService.checkBuildAndUpdate(app);
if (!needBuild) {
applicationLog.setSuccess(true);
applicationLogService.save(applicationLog);
return true;
}
// rollback
if (app.isNeedRollback() && app.isJobTypeFlinkSqlOrCDC()) {
flinkSqlService.rollback(app);
}
// 1) flink sql setDependency
FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(), CandidateTypeEnum.NEW);
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false);
FlinkJobType jobType = app.getJobTypeEnum();
if (jobType == FlinkJobType.FLINK_SQL || jobType == FlinkJobType.PYFLINK || jobType == FlinkJobType.FLINK_CDC) {
FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql;
AssertUtils.notNull(flinkSql);
app.setDependency(flinkSql.getDependency());
app.setTeamResource(flinkSql.getTeamResource());
}
// create pipeline instance
BuildPipeline pipeline = createPipelineInstance(app);
// clear history
removeByAppId(app.getId());
// register pipeline progress event watcher.
// save snapshot of pipeline to db when status of pipeline was changed.
pipeline.registerWatcher(
new PipeWatcher() {
@Override
public void onStart(PipelineSnapshot snapshot) {
ApplicationBuildPipeline buildPipeline = ApplicationBuildPipeline.fromPipeSnapshot(snapshot)
.setAppId(app.getId());
saveEntity(buildPipeline);
app.setRelease(ReleaseStateEnum.RELEASING.get());
applicationManageService.updateRelease(app);
if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
flinkAppHttpWatcher.init();
}
// 1) checkEnv
applicationInfoService.checkEnv(app);
// 2) some preparatory work
String appUploads = app.getWorkspace().APP_UPLOADS();
if (app.isJobTypeFlinkJarOrPyFlink()) {
// flinkJar upload jar to appHome...
String appHome = app.getAppHome();
FsOperator fsOperator = app.getFsOperator();
fsOperator.delete(appHome);
if (app.isResourceFromUpload()) {
String uploadJar = appUploads.concat("/").concat(app.getJar());
File localJar = new File(
String.format(
"%s/%d/%s",
Workspace.local().APP_UPLOADS(),
app.getTeamId(),
app.getJar()));
if (!localJar.exists()) {
Resource resource = resourceService.findByResourceName(app.getTeamId(),
app.getJar());
if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
localJar = new File(resource.getFilePath());
uploadJar = appUploads.concat("/").concat(localJar.getName());
} else {
localJar =
new File(WebUtils.getAppTempDir(), app.getJar());
uploadJar = appUploads.concat("/").concat(localJar.getName());
}
}
// upload jar copy to appHome
checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, appUploads);
switch (app.getApplicationType()) {
case STREAMPARK_FLINK:
fsOperator.mkdirs(app.getAppLib());
fsOperator.copy(uploadJar, app.getAppLib(), false, true);
break;
case APACHE_FLINK:
fsOperator.mkdirs(appHome);
fsOperator.copy(uploadJar, appHome, false, true);
break;
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of FlinkJar: "
+ app.getApplicationType());
}
} else {
fsOperator.upload(app.getDistHome(), appHome);
}
} else {
if (!app.getDependencyObject().getJar().isEmpty()) {
String localUploads = Workspace.local().APP_UPLOADS();
// copy jar to local upload dir
for (String jar : app.getDependencyObject().getJar()) {
File localJar = new File(WebUtils.getAppTempDir(), jar);
File uploadJar = new File(localUploads, jar);
if (!localJar.exists() && !uploadJar.exists()) {
throw new ApiAlertException(
"Missing file: " + jar + ", please upload again");
}
if (localJar.exists()) {
checkOrElseUploadJar(
FsOperator.lfs(), localJar, uploadJar.getAbsolutePath(),
localUploads);
}
}
}
}
}
@Override
public void onStepStateChange(PipelineSnapshot snapshot) {
ApplicationBuildPipeline buildPipeline = ApplicationBuildPipeline.fromPipeSnapshot(snapshot)
.setAppId(app.getId());
saveEntity(buildPipeline);
}
@Override
public void onFinish(PipelineSnapshot snapshot, BuildResult result) {
ApplicationBuildPipeline buildPipeline = ApplicationBuildPipeline.fromPipeSnapshot(snapshot)
.setAppId(app.getId())
.setBuildResult(result);
saveEntity(buildPipeline);
if (result.pass()) {
// running job ...
if (app.isRunning()) {
app.setRelease(ReleaseStateEnum.NEED_RESTART.get());
} else {
app.setOptionState(OptionStateEnum.NONE.getValue());
app.setRelease(ReleaseStateEnum.DONE.get());
// If the current task is not running, or the task has just been added, directly
// set
// the candidate version to the official version
if (app.isJobTypeFlinkSqlOrCDC()) {
applicationManageService.toEffective(app);
} else {
if (app.isAppTypeStreamPark()) {
FlinkApplicationConfig config =
applicationConfigService.getLatest(app.getId());
if (config != null) {
config.setToApplication(app);
applicationConfigService.toEffective(app.getId(),
app.getConfigId());
}
}
}
}
// backup.
if (!app.isNeedRollback()) {
if (app.isJobTypeFlinkSqlOrCDC() && newFlinkSql != null) {
backUpService.backup(app, newFlinkSql);
} else {
backUpService.backup(app, null);
}
}
applicationLog.setSuccess(true);
app.setBuild(false);
} else {
Message message = new Message(
ServiceHelper.getUserId(),
app.getId(),
app.getJobName().concat(" release failed"),
ExceptionUtils.stringifyException(snapshot.error().exception()),
NoticeTypeEnum.EXCEPTION);
messageService.push(message);
app.setRelease(ReleaseStateEnum.FAILED.get());
app.setOptionState(OptionStateEnum.NONE.getValue());
app.setBuild(true);
applicationLog.setException(
ExceptionUtils.stringifyException(snapshot.error().exception()));
applicationLog.setSuccess(false);
}
applicationManageService.updateRelease(app);
applicationLogService.save(applicationLog);
if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
flinkAppHttpWatcher.init();
}
}
});
// save docker resolve progress detail to cache, only for flink-k8s application mode.
if (PipelineTypeEnum.FLINK_NATIVE_K8S_APPLICATION == pipeline.pipeType()) {
registerDockerProgressWatcher(pipeline, app);
}
// save pipeline instance snapshot to db before release it.
ApplicationBuildPipeline buildPipeline =
ApplicationBuildPipeline.initFromPipeline(pipeline).setAppId(app.getId());
boolean saved = saveEntity(buildPipeline);
DOCKER_PULL_PG_SNAPSHOTS.invalidate(app.getId());
DOCKER_BUILD_PG_SNAPSHOTS.invalidate(app.getId());
DOCKER_PUSH_PG_SNAPSHOTS.invalidate(app.getId());
// async release pipeline
executorService.submit((Runnable) pipeline::launch);
return saved;
}