public void start()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java [274:409]


    public void start(SparkApplication appParam, boolean auto) throws Exception {
        // For HA purposes, if the task is not processed locally, save the Distribution task and return
        if (!distributedTaskService.isLocalProcessing(appParam.getId())) {
            distributedTaskService.saveDistributedTask(appParam, false, DistributedTaskEnum.START);
            return;
        }
        // 1) check application
        final SparkApplication application = getById(appParam.getId());
        AssertUtils.notNull(application);
        ApiAlertException.throwIfTrue(
            !application.isCanBeStart(), "[StreamPark] The application cannot be started repeatedly.");

        SparkEnv sparkEnv = sparkEnvService.getByIdOrDefault(application.getVersionId());
        ApiAlertException.throwIfNull(sparkEnv, "[StreamPark] can no found spark version");

        if (SparkDeployMode.isYarnMode(application.getDeployModeEnum())) {
            checkYarnBeforeStart(application);
        }

        ApplicationBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());
        AssertUtils.notNull(buildPipeline);

        // if manually started, clear the restart flag
        if (!auto) {
            application.setRestartCount(0);
        } else {
            if (!application.isNeedRestartOnFailed()) {
                return;
            }
            application.setRestartCount(application.getRestartCount() + 1);
        }

        // 2) update app state to starting...
        starting(application);

        ApplicationLog applicationLog = new ApplicationLog();
        applicationLog.setJobType(EngineTypeEnum.SPARK.getCode());
        applicationLog.setOptionName(SparkOperationEnum.START.getValue());
        applicationLog.setAppId(application.getId());
        applicationLog.setCreateTime(new Date());
        applicationLog.setUserId(ServiceHelper.getUserId());

        // set the latest to Effective, (it will only become the current effective at this time)
        // applicationManageService.toEffective(application);

        Map<String, Object> extraParameter = new HashMap<>(0);
        if (application.isSparkSqlJob()) {
            SparkSql sparkSql = sparkSqlService.getEffective(application.getId(), true);
            // Get the sql of the replaced placeholder
            String realSql = variableService.replaceVariable(application.getTeamId(), sparkSql.getSql());
            sparkSql.setSql(DeflaterUtils.zipString(realSql));
            extraParameter.put(ConfigKeys.KEY_SPARK_SQL(null), sparkSql.getSql());
        }

        Tuple2<String, String> userJarAndAppConf = getUserJarAndAppConf(sparkEnv, application);
        String sparkUserJar = userJarAndAppConf.f0;
        String appConf = userJarAndAppConf.f1;

        BuildResult buildResult = buildPipeline.getBuildResult();
        if (SparkDeployMode.isYarnMode(application.getDeployModeEnum())) {
            buildResult = new ShadedBuildResponse(null, sparkUserJar, true);
            if (StringUtils.isNotBlank(application.getYarnQueueName())) {
                extraParameter.put(ConfigKeys.KEY_SPARK_YARN_QUEUE_NAME(), application.getYarnQueueName());
            }
            if (StringUtils.isNotBlank(application.getYarnQueueLabel())) {
                extraParameter.put(ConfigKeys.KEY_SPARK_YARN_QUEUE_LABEL(), application.getYarnQueueLabel());
            }
        }

        // Get the args after placeholder replacement
        String applicationArgs = variableService.replaceVariable(application.getTeamId(), application.getAppArgs());

        SubmitRequest submitRequest = new SubmitRequest(
            sparkEnv.getSparkVersion(),
            SparkDeployMode.of(application.getDeployMode()),
            sparkEnv.getSparkConf(),
            SparkJobType.valueOf(application.getJobType()),
            application.getId(),
            application.getAppName(),
            application.getMainClass(),
            appConf,
            PropertiesUtils.extractSparkPropertiesAsJava(application.getAppProperties()),
            PropertiesUtils.extractSparkArgumentsAsJava(applicationArgs),
            application.getApplicationType(),
            application.getHadoopUser(),
            buildResult,
            extraParameter);

        CompletableFuture<SubmitResponse> future = CompletableFuture
            .supplyAsync(() -> SparkClient.submit(submitRequest), executorService);

        startJobFutureMap.put(application.getId(), future);
        future.whenComplete(
            (response, throwable) -> {
                // 1) remove Future
                startJobFutureMap.remove(application.getId());

                // 2) exception
                if (throwable != null) {
                    String exception = ExceptionUtils.stringifyException(throwable);
                    applicationLog.setException(exception);
                    applicationLog.setSuccess(false);
                    applicationLogService.save(applicationLog);
                    if (throwable instanceof CancellationException) {
                        doStopped(application.getId());
                    } else {
                        SparkApplication app = getById(appParam.getId());
                        app.setState(SparkAppStateEnum.FAILED.getValue());
                        app.setOptionState(SparkOptionStateEnum.NONE.getValue());
                        updateById(app);
                        SparkAppHttpWatcher.unWatching(appParam.getId());
                    }
                    return;
                }

                // 3) success
                applicationLog.setSuccess(true);
                application.resolveScheduleConf(response.sparkProperties());
                if (StringUtils.isNoneEmpty(response.sparkAppId())) {
                    application.setClusterId(response.sparkAppId());
                }
                applicationLog.setClusterId(response.sparkAppId());
                applicationLog.setTrackingUrl(response.trackingUrl());
                application.setStartTime(new Date());
                application.setEndTime(null);

                // if start completed, will be added task to tracking queue
                SparkAppHttpWatcher.setOptionState(appParam.getId(), SparkOptionStateEnum.STARTING);
                SparkAppHttpWatcher.doWatching(application);

                // update app
                updateById(application);
                // save log
                applicationLogService.save(applicationLog);
            });
    }