public boolean buildApplication()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java [169:382]


  public boolean buildApplication(Long appId, boolean forceBuild) {
    // check the build environment
    checkBuildEnv(appId, forceBuild);

    Application app = applicationService.getById(appId);
    ApplicationLog applicationLog = new ApplicationLog();
    applicationLog.setOptionName(RELEASE.getValue());
    applicationLog.setAppId(app.getId());
    applicationLog.setOptionTime(new Date());

    // check if you need to go through the build process (if the jar and pom have changed,
    // you need to go through the build process, if other common parameters are modified,
    // you don't need to go through the build process)
    boolean needBuild = applicationService.checkBuildAndUpdate(app);
    if (!needBuild) {
      applicationLog.setSuccess(true);
      applicationLogService.save(applicationLog);
      return true;
    }
    // rollback
    if (app.isNeedRollback() && app.isFlinkSqlJob()) {
      flinkSqlService.rollback(app);
    }

    // 1) flink sql setDependency
    FlinkSql newFlinkSql = flinkSqlService.getCandidate(app.getId(), CandidateType.NEW);
    FlinkSql effectiveFlinkSql = flinkSqlService.getEffective(app.getId(), false);
    if (app.isFlinkSqlJob()) {
      FlinkSql flinkSql = newFlinkSql == null ? effectiveFlinkSql : newFlinkSql;
      Utils.notNull(flinkSql);
      app.setDependency(flinkSql.getDependency());
      app.setTeamResource(flinkSql.getTeamResource());
    }

    // create pipeline instance
    BuildPipeline pipeline = createPipelineInstance(app);

    // clear history
    removeApp(app.getId());
    // register pipeline progress event watcher.
    // save snapshot of pipeline to db when status of pipeline was changed.
    pipeline.registerWatcher(
        new PipeWatcher() {
          @Override
          public void onStart(PipeSnapshot snapshot) {
            AppBuildPipeline buildPipeline =
                AppBuildPipeline.fromPipeSnapshot(snapshot).setAppId(app.getId());
            saveEntity(buildPipeline);

            app.setRelease(ReleaseState.RELEASING.get());
            applicationService.updateRelease(app);

            if (flinkHttpWatcher.isWatchingApp(app.getId())) {
              flinkHttpWatcher.init();
            }

            // 1) checkEnv
            applicationService.checkEnv(app);

            // 2) some preparatory work
            String appUploads = app.getWorkspace().APP_UPLOADS();

            if (app.isCustomCodeJob()) {
              // customCode upload jar to appHome...
              String appHome = app.getAppHome();
              FsOperator fsOperator = app.getFsOperator();
              fsOperator.delete(appHome);
              if (app.isUploadJob()) {
                File localJar =
                    new File(
                        String.format(
                            "%s/%d/%s",
                            Workspace.local().APP_UPLOADS(), app.getTeamId(), app.getJar()));
                // upload jar copy to appHome
                String uploadJar = appUploads.concat("/").concat(app.getJar());
                checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, appUploads);

                switch (app.getApplicationType()) {
                  case STREAMPARK_FLINK:
                    fsOperator.mkdirs(app.getAppLib());
                    fsOperator.copy(uploadJar, app.getAppLib(), false, true);
                    break;
                  case APACHE_FLINK:
                    fsOperator.mkdirs(appHome);
                    fsOperator.copy(uploadJar, appHome, false, true);
                    break;
                  default:
                    throw new IllegalArgumentException(
                        "[StreamPark] unsupported ApplicationType of custom code: "
                            + app.getApplicationType());
                }
              } else {
                fsOperator.upload(app.getDistHome(), appHome);
              }
            } else {
              if (!app.getDependencyObject().getJar().isEmpty()) {
                String localUploads = Workspace.local().APP_UPLOADS();
                // copy jar to local upload dir
                for (String jar : app.getDependencyObject().getJar()) {
                  File localJar = new File(WebUtils.getAppTempDir(), jar);
                  File uploadJar = new File(localUploads, jar);
                  if (!localJar.exists() && !uploadJar.exists()) {
                    throw new ApiAlertException("Missing file: " + jar + ", please upload again");
                  }
                  if (localJar.exists()) {
                    checkOrElseUploadJar(
                        FsOperator.lfs(), localJar, uploadJar.getAbsolutePath(), localUploads);
                  }
                }
              }
            }
          }

          @Override
          public void onStepStateChange(PipeSnapshot snapshot) {
            AppBuildPipeline buildPipeline =
                AppBuildPipeline.fromPipeSnapshot(snapshot).setAppId(app.getId());
            saveEntity(buildPipeline);
          }

          @Override
          public void onFinish(PipeSnapshot snapshot, BuildResult result) {
            AppBuildPipeline buildPipeline =
                AppBuildPipeline.fromPipeSnapshot(snapshot)
                    .setAppId(app.getId())
                    .setBuildResult(result);
            saveEntity(buildPipeline);
            if (result.pass()) {
              // running job ...
              if (app.isRunning()) {
                app.setRelease(ReleaseState.NEED_RESTART.get());
              } else {
                app.setOptionState(OptionState.NONE.getValue());
                app.setRelease(ReleaseState.DONE.get());
                // If the current task is not running, or the task has just been added, directly set
                // the candidate version to the official version
                if (app.isFlinkSqlJob()) {
                  applicationService.toEffective(app);
                } else {
                  if (app.isStreamParkJob()) {
                    ApplicationConfig config = applicationConfigService.getLatest(app.getId());
                    if (config != null) {
                      config.setToApplication(app);
                      applicationConfigService.toEffective(app.getId(), app.getConfigId());
                    }
                  }
                }
              }
              // backup.
              if (!app.isNeedRollback()) {
                if (app.isFlinkSqlJob() && newFlinkSql != null) {
                  backUpService.backup(app, newFlinkSql);
                } else {
                  backUpService.backup(app, null);
                }
              }
              applicationLog.setSuccess(true);
              app.setBuild(false);

            } else {
              Message message =
                  new Message(
                      commonService.getUserId(),
                      app.getId(),
                      app.getJobName().concat(" release failed"),
                      Utils.stringifyException(snapshot.error().exception()),
                      NoticeType.EXCEPTION);
              messageService.push(message);
              app.setRelease(ReleaseState.FAILED.get());
              app.setOptionState(OptionState.NONE.getValue());
              app.setBuild(true);
              applicationLog.setException(Utils.stringifyException(snapshot.error().exception()));
              applicationLog.setSuccess(false);
            }
            applicationService.updateRelease(app);
            applicationLogService.save(applicationLog);
            if (flinkHttpWatcher.isWatchingApp(app.getId())) {
              flinkHttpWatcher.init();
            }
          }
        });
    // save docker resolve progress detail to cache, only for flink-k8s application mode.
    if (PipelineType.FLINK_NATIVE_K8S_APPLICATION == pipeline.pipeType()) {
      pipeline
          .as(FlinkK8sApplicationBuildPipeline.class)
          .registerDockerProgressWatcher(
              new DockerProgressWatcher() {
                @Override
                public void onDockerPullProgressChange(DockerPullSnapshot snapshot) {
                  DOCKER_PULL_PG_SNAPSHOTS.put(app.getId(), snapshot);
                }

                @Override
                public void onDockerBuildProgressChange(DockerBuildSnapshot snapshot) {
                  DOCKER_BUILD_PG_SNAPSHOTS.put(app.getId(), snapshot);
                }

                @Override
                public void onDockerPushProgressChange(DockerPushSnapshot snapshot) {
                  DOCKER_PUSH_PG_SNAPSHOTS.put(app.getId(), snapshot);
                }
              });
    }
    // save pipeline instance snapshot to db before release it.
    AppBuildPipeline buildPipeline =
        AppBuildPipeline.initFromPipeline(pipeline).setAppId(app.getId());
    boolean saved = saveEntity(buildPipeline);
    DOCKER_PULL_PG_SNAPSHOTS.invalidate(app.getId());
    DOCKER_BUILD_PG_SNAPSHOTS.invalidate(app.getId());
    DOCKER_PUSH_PG_SNAPSHOTS.invalidate(app.getId());
    // async release pipeline
    executorService.submit((Runnable) pipeline::launch);
    return saved;
  }