public void start()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java [409:527]


    public void start(FlinkApplication appParam, boolean auto) throws Exception {
        // For HA purposes, if the task is not processed locally, save the Distribution task and return
        if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
            distributedTaskService.saveDistributedTask(appParam, auto, DistributedTaskEnum.START);
            return;
        }
        // 1) check application
        final FlinkApplication application = getById(appParam.getId());
        AssertUtils.notNull(application);
        ApiAlertException.throwIfTrue(
            !application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly.");

        if (FlinkDeployMode.isRemoteMode(application.getDeployModeEnum())
            || FlinkDeployMode.isSessionMode(application.getDeployModeEnum())) {
            checkBeforeStart(application);
        }

        if (FlinkDeployMode.isYarnMode(application.getDeployModeEnum())) {
            ApiAlertException.throwIfTrue(
                !applicationInfoService.getYarnAppReport(application.getJobName()).isEmpty(),
                "[StreamPark] The same task name is already running in the yarn queue");
        }

        ApplicationBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
        AssertUtils.notNull(buildPipeline);

        FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
        ApiAlertException.throwIfNull(flinkEnv, "[StreamPark] can no found flink version");

        // if manually started, clear the restart flag
        if (!auto) {
            application.setRestartCount(0);
        } else {
            if (!application.isNeedRestartOnFailed()) {
                return;
            }
            appParam.setRestoreOrTriggerSavepoint(true);
            application.setRestartCount(application.getRestartCount() + 1);
        }
        // 2) update app state to starting...
        starting(application);
        ApplicationLog applicationLog = constructAppLog(application);
        // set the latest to Effective, (it will only become the current effective at this time)
        applicationManageService.toEffective(application);

        Map<String, Object> extraParameter = new HashMap<>(0);
        if (application.isJobTypeFlinkSqlOrCDC()) {
            FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
            // Get the sql of the replaced placeholder
            String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
            flinkSql.setSql(DeflaterUtils.zipString(realSql));
            extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null), flinkSql.getSql());
        }

        Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(flinkEnv, application);
        String flinkUserJar = userJarAndAppConf.t1;
        String appConf = userJarAndAppConf.t2;

        BuildResult buildResult = buildPipeline.getBuildResult();
        if (FlinkDeployMode.YARN_APPLICATION == application.getDeployModeEnum()) {
            buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
        }

        // Get the args after placeholder replacement
        String args =
            StringUtils.isBlank(appParam.getArgs()) ? application.getArgs() : appParam.getArgs();
        String applicationArgs = variableService.replaceVariable(application.getTeamId(), args);

        Tuple3<String, String, FlinkK8sRestExposedType> clusterIdNamespace =
            getNamespaceClusterId(application);
        String k8sNamespace = clusterIdNamespace.t1;
        String k8sClusterId = clusterIdNamespace.t2;
        FlinkK8sRestExposedType exposedType = clusterIdNamespace.t3;

        String dynamicProperties =
            StringUtils.isBlank(appParam.getDynamicProperties())
                ? application.getDynamicProperties()
                : appParam.getDynamicProperties();

        SubmitRequest submitRequest =
            new SubmitRequest(
                flinkEnv.getFlinkVersion(),
                FlinkDeployMode.of(application.getDeployMode()),
                getProperties(application, dynamicProperties),
                flinkEnv.getFlinkConf(),
                FlinkJobType.of(application.getJobType()),
                application.getId(),
                new JobID().toHexString(),
                application.getJobName(),
                appConf,
                application.getApplicationType(),
                getSavepointPath(appParam),
                FlinkRestoreMode.of(appParam.getRestoreMode()),
                applicationArgs,
                k8sClusterId,
                application.getHadoopUser(),
                buildResult,
                extraParameter,
                k8sNamespace,
                exposedType);

        CompletableFuture<SubmitResponse> future =
            CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService);

        startFutureMap.put(application.getId(), future);

        future.whenCompleteAsync(
            (response, throwable) -> {
                // 1) remove Future
                startFutureMap.remove(application.getId());
                // 2) exception
                if (throwable != null) {
                    processForException(appParam, throwable, applicationLog, application);
                    return;
                }
                // 3) success
                processForSuccess(appParam, response, applicationLog, application);
            });
    }