in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java [169:382]
public boolean buildApplication(Long appId, boolean forceBuild) {
// check the build environment
checkBuildEnv(appId, forceBuild);
Application app = applicationService.getById(appId);
ApplicationLog applicationLog = new ApplicationLog();
applicationLog.setOptionName(RELEASE.getValue());
applicationLog.setAppId(app.getId());
applicationLog.setOptionTime(new Date());
// check if you need to go through the build process (if the jar and pom have changed,
// you need to go through the build process, if other common parameters are modified,
// you don't need to go through the build process)
boolean needBuild = applicationService.checkBuildAndUpdate(app);
if (!needBuild) {
applicationLog.setSuccess(true);
applicationLogService.save(applicationLog);
return true;
}
// rollback
if (app.isNeedRollback() && app.isFlinkSqlJob()) {
flinkSqlService.rollback(app);
}
// 1) flink sql setDependency
FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(), CandidateType.NEW);
FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false);
if (app.isFlinkSqlJob()) {
FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql;
Utils.notNull(flinkSql);
app.setDependency(flinkSql.getDependency());
app.setTeamResource(flinkSql.getTeamResource());
}
// create pipeline instance
BuildPipeline pipeline = createPipelineInstance(app);
// clear history
removeApp(app.getId());
// register pipeline progress event watcher.
// save snapshot of pipeline to db when status of pipeline was changed.
pipeline.registerWatcher(
new PipeWatcher() {
@Override
public void onStart(PipeSnapshot snapshot) {
AppBuildPipeline buildPipeline =
AppBuildPipeline.fromPipeSnapshot(snapshot).setAppId(app.getId());
saveEntity(buildPipeline);
app.setRelease(ReleaseState.RELEASING.get());
applicationService.updateRelease(app);
if (flinkHttpWatcher.isWatchingApp(app.getId())) {
flinkHttpWatcher.init();
}
// 1) checkEnv
applicationService.checkEnv(app);
// 2) some preparatory work
String appUploads = app.getWorkspace().APP_UPLOADS();
if (app.isCustomCodeJob()) {
// customCode upload jar to appHome...
String appHome = app.getAppHome();
FsOperator fsOperator = app.getFsOperator();
fsOperator.delete(appHome);
if (app.isUploadJob()) {
File localJar =
new File(
String.format(
"%s/%d/%s",
Workspace.local().APP_UPLOADS(), app.getTeamId(), app.getJar()));
// upload jar copy to appHome
String uploadJar = appUploads.concat("/").concat(app.getJar());
checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, appUploads);
switch (app.getApplicationType()) {
case STREAMPARK_FLINK:
fsOperator.mkdirs(app.getAppLib());
fsOperator.copy(uploadJar, app.getAppLib(), false, true);
break;
case APACHE_FLINK:
fsOperator.mkdirs(appHome);
fsOperator.copy(uploadJar, appHome, false, true);
break;
default:
throw new IllegalArgumentException(
"[StreamPark] unsupported ApplicationType of custom code: "
+ app.getApplicationType());
}
} else {
fsOperator.upload(app.getDistHome(), appHome);
}
} else {
if (!app.getDependencyObject().getJar().isEmpty()) {
String localUploads = Workspace.local().APP_UPLOADS();
// copy jar to local upload dir
for (String jar : app.getDependencyObject().getJar()) {
File localJar = new File(WebUtils.getAppTempDir(), jar);
File uploadJar = new File(localUploads, jar);
if (!localJar.exists() && !uploadJar.exists()) {
throw new ApiAlertException("Missing file: " + jar + ", please upload again");
}
if (localJar.exists()) {
checkOrElseUploadJar(
FsOperator.lfs(), localJar, uploadJar.getAbsolutePath(), localUploads);
}
}
}
}
}
@Override
public void onStepStateChange(PipeSnapshot snapshot) {
AppBuildPipeline buildPipeline =
AppBuildPipeline.fromPipeSnapshot(snapshot).setAppId(app.getId());
saveEntity(buildPipeline);
}
@Override
public void onFinish(PipeSnapshot snapshot, BuildResult result) {
AppBuildPipeline buildPipeline =
AppBuildPipeline.fromPipeSnapshot(snapshot)
.setAppId(app.getId())
.setBuildResult(result);
saveEntity(buildPipeline);
if (result.pass()) {
// running job ...
if (app.isRunning()) {
app.setRelease(ReleaseState.NEED_RESTART.get());
} else {
app.setOptionState(OptionState.NONE.getValue());
app.setRelease(ReleaseState.DONE.get());
// If the current task is not running, or the task has just been added, directly set
// the candidate version to the official version
if (app.isFlinkSqlJob()) {
applicationService.toEffective(app);
} else {
if (app.isStreamParkJob()) {
ApplicationConfig config = applicationConfigService.getLatest(app.getId());
if (config != null) {
config.setToApplication(app);
applicationConfigService.toEffective(app.getId(), app.getConfigId());
}
}
}
}
// backup.
if (!app.isNeedRollback()) {
if (app.isFlinkSqlJob() && newFlinkSql != null) {
backUpService.backup(app, newFlinkSql);
} else {
backUpService.backup(app, null);
}
}
applicationLog.setSuccess(true);
app.setBuild(false);
} else {
Message message =
new Message(
commonService.getUserId(),
app.getId(),
app.getJobName().concat(" release failed"),
Utils.stringifyException(snapshot.error().exception()),
NoticeType.EXCEPTION);
messageService.push(message);
app.setRelease(ReleaseState.FAILED.get());
app.setOptionState(OptionState.NONE.getValue());
app.setBuild(true);
applicationLog.setException(Utils.stringifyException(snapshot.error().exception()));
applicationLog.setSuccess(false);
}
applicationService.updateRelease(app);
applicationLogService.save(applicationLog);
if (flinkHttpWatcher.isWatchingApp(app.getId())) {
flinkHttpWatcher.init();
}
}
});
// save docker resolve progress detail to cache, only for flink-k8s application mode.
if (PipelineType.FLINK_NATIVE_K8S_APPLICATION == pipeline.pipeType()) {
pipeline
.as(FlinkK8sApplicationBuildPipeline.class)
.registerDockerProgressWatcher(
new DockerProgressWatcher() {
@Override
public void onDockerPullProgressChange(DockerPullSnapshot snapshot) {
DOCKER_PULL_PG_SNAPSHOTS.put(app.getId(), snapshot);
}
@Override
public void onDockerBuildProgressChange(DockerBuildSnapshot snapshot) {
DOCKER_BUILD_PG_SNAPSHOTS.put(app.getId(), snapshot);
}
@Override
public void onDockerPushProgressChange(DockerPushSnapshot snapshot) {
DOCKER_PUSH_PG_SNAPSHOTS.put(app.getId(), snapshot);
}
});
}
// save pipeline instance snapshot to db before release it.
AppBuildPipeline buildPipeline =
AppBuildPipeline.initFromPipeline(pipeline).setAppId(app.getId());
boolean saved = saveEntity(buildPipeline);
DOCKER_PULL_PG_SNAPSHOTS.invalidate(app.getId());
DOCKER_BUILD_PG_SNAPSHOTS.invalidate(app.getId());
DOCKER_PUSH_PG_SNAPSHOTS.invalidate(app.getId());
// async release pipeline
executorService.submit((Runnable) pipeline::launch);
return saved;
}