public boolean buildApplication()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationBuildPipelineServiceImpl.java [178:389]


    public boolean buildApplication(@Nonnull Long appId, boolean forceBuild) {
        // check the build environment
        checkBuildEnv(appId, forceBuild);

        FlinkApplication app = applicationManageService.getById(appId);
        ApplicationLog applicationLog = getApplicationLog(app);

        // check if you need to go through the build process (if the jar and pom have changed,
        // you need to go through the build process, if other common parameters are modified,
        // you don't need to go through the build process)
        boolean needBuild = applicationManageService.checkBuildAndUpdate(app);
        if (!needBuild) {
            applicationLog.setSuccess(true);
            applicationLogService.save(applicationLog);
            return true;
        }
        // rollback
        if (app.isNeedRollback() && app.isJobTypeFlinkSqlOrCDC()) {
            flinkSqlService.rollback(app);
        }

        // 1) flink sql setDependency
        FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(), CandidateTypeEnum.NEW);
        FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false);
        FlinkJobType jobType = app.getJobTypeEnum();
        if (jobType == FlinkJobType.FLINK_SQL || jobType == FlinkJobType.PYFLINK || jobType == FlinkJobType.FLINK_CDC) {
            FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql;
            AssertUtils.notNull(flinkSql);
            app.setDependency(flinkSql.getDependency());
            app.setTeamResource(flinkSql.getTeamResource());
        }

        // create pipeline instance
        BuildPipeline pipeline = createPipelineInstance(app);

        // clear history
        removeByAppId(app.getId());
        // register pipeline progress event watcher.
        // save snapshot of pipeline to db when status of pipeline was changed.
        pipeline.registerWatcher(
            new PipeWatcher() {

                @Override
                public void onStart(PipelineSnapshot snapshot) {
                    ApplicationBuildPipeline buildPipeline = ApplicationBuildPipeline.fromPipeSnapshot(snapshot)
                        .setAppId(app.getId());
                    saveEntity(buildPipeline);

                    app.setRelease(ReleaseStateEnum.RELEASING.get());
                    applicationManageService.updateRelease(app);

                    if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
                        flinkAppHttpWatcher.init();
                    }

                    // 1) checkEnv
                    applicationInfoService.checkEnv(app);

                    // 2) some preparatory work
                    String appUploads = app.getWorkspace().APP_UPLOADS();

                    if (app.isJobTypeFlinkJarOrPyFlink()) {
                        // flinkJar upload jar to appHome...
                        String appHome = app.getAppHome();
                        FsOperator fsOperator = app.getFsOperator();
                        fsOperator.delete(appHome);
                        if (app.isResourceFromUpload()) {
                            String uploadJar = appUploads.concat("/").concat(app.getJar());
                            File localJar = new File(
                                String.format(
                                    "%s/%d/%s",
                                    Workspace.local().APP_UPLOADS(),
                                    app.getTeamId(),
                                    app.getJar()));
                            if (!localJar.exists()) {
                                Resource resource = resourceService.findByResourceName(app.getTeamId(),
                                    app.getJar());
                                if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
                                    localJar = new File(resource.getFilePath());
                                    uploadJar = appUploads.concat("/").concat(localJar.getName());
                                } else {
                                    localJar =
                                        new File(WebUtils.getAppTempDir(), app.getJar());
                                    uploadJar = appUploads.concat("/").concat(localJar.getName());
                                }
                            }
                            // upload jar copy to appHome
                            checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, appUploads);

                            switch (app.getApplicationType()) {
                                case STREAMPARK_FLINK:
                                    fsOperator.mkdirs(app.getAppLib());
                                    fsOperator.copy(uploadJar, app.getAppLib(), false, true);
                                    break;
                                case APACHE_FLINK:
                                    fsOperator.mkdirs(appHome);
                                    fsOperator.copy(uploadJar, appHome, false, true);
                                    break;
                                default:
                                    throw new IllegalArgumentException(
                                        "[StreamPark] unsupported ApplicationType of FlinkJar: "
                                            + app.getApplicationType());
                            }
                        } else {
                            fsOperator.upload(app.getDistHome(), appHome);
                        }
                    } else {
                        if (!app.getDependencyObject().getJar().isEmpty()) {
                            String localUploads = Workspace.local().APP_UPLOADS();
                            // copy jar to local upload dir
                            for (String jar : app.getDependencyObject().getJar()) {
                                File localJar = new File(WebUtils.getAppTempDir(), jar);
                                File uploadJar = new File(localUploads, jar);
                                if (!localJar.exists() && !uploadJar.exists()) {
                                    throw new ApiAlertException(
                                        "Missing file: " + jar + ", please upload again");
                                }
                                if (localJar.exists()) {
                                    checkOrElseUploadJar(
                                        FsOperator.lfs(), localJar, uploadJar.getAbsolutePath(),
                                        localUploads);
                                }
                            }
                        }
                    }
                }

                @Override
                public void onStepStateChange(PipelineSnapshot snapshot) {
                    ApplicationBuildPipeline buildPipeline = ApplicationBuildPipeline.fromPipeSnapshot(snapshot)
                        .setAppId(app.getId());
                    saveEntity(buildPipeline);
                }

                @Override
                public void onFinish(PipelineSnapshot snapshot, BuildResult result) {
                    ApplicationBuildPipeline buildPipeline = ApplicationBuildPipeline.fromPipeSnapshot(snapshot)
                        .setAppId(app.getId())
                        .setBuildResult(result);
                    saveEntity(buildPipeline);
                    if (result.pass()) {
                        // running job ...
                        if (app.isRunning()) {
                            app.setRelease(ReleaseStateEnum.NEED_RESTART.get());
                        } else {
                            app.setOptionState(OptionStateEnum.NONE.getValue());
                            app.setRelease(ReleaseStateEnum.DONE.get());
                            // If the current task is not running, or the task has just been added, directly
                            // set
                            // the candidate version to the official version
                            if (app.isJobTypeFlinkSqlOrCDC()) {
                                applicationManageService.toEffective(app);
                            } else {
                                if (app.isAppTypeStreamPark()) {
                                    FlinkApplicationConfig config =
                                        applicationConfigService.getLatest(app.getId());
                                    if (config != null) {
                                        config.setToApplication(app);
                                        applicationConfigService.toEffective(app.getId(),
                                            app.getConfigId());
                                    }
                                }
                            }
                        }
                        // backup.
                        if (!app.isNeedRollback()) {
                            if (app.isJobTypeFlinkSqlOrCDC() && newFlinkSql != null) {
                                backUpService.backup(app, newFlinkSql);
                            } else {
                                backUpService.backup(app, null);
                            }
                        }
                        applicationLog.setSuccess(true);
                        app.setBuild(false);

                    } else {
                        Message message = new Message(
                            ServiceHelper.getUserId(),
                            app.getId(),
                            app.getJobName().concat(" release failed"),
                            ExceptionUtils.stringifyException(snapshot.error().exception()),
                            NoticeTypeEnum.EXCEPTION);
                        messageService.push(message);
                        app.setRelease(ReleaseStateEnum.FAILED.get());
                        app.setOptionState(OptionStateEnum.NONE.getValue());
                        app.setBuild(true);
                        applicationLog.setException(
                            ExceptionUtils.stringifyException(snapshot.error().exception()));
                        applicationLog.setSuccess(false);
                    }
                    applicationManageService.updateRelease(app);
                    applicationLogService.save(applicationLog);
                    if (flinkAppHttpWatcher.isWatchingApp(app.getId())) {
                        flinkAppHttpWatcher.init();
                    }
                }
            });
        // save docker resolve progress detail to cache, only for flink-k8s application mode.
        if (PipelineTypeEnum.FLINK_NATIVE_K8S_APPLICATION == pipeline.pipeType()) {
            registerDockerProgressWatcher(pipeline, app);
        }
        // save pipeline instance snapshot to db before release it.
        ApplicationBuildPipeline buildPipeline =
            ApplicationBuildPipeline.initFromPipeline(pipeline).setAppId(app.getId());
        boolean saved = saveEntity(buildPipeline);
        DOCKER_PULL_PG_SNAPSHOTS.invalidate(app.getId());
        DOCKER_BUILD_PG_SNAPSHOTS.invalidate(app.getId());
        DOCKER_PUSH_PG_SNAPSHOTS.invalidate(app.getId());
        // async release pipeline
        executorService.submit((Runnable) pipeline::launch);
        return saved;
    }