public void start()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java [1402:1649]


  public void start(Application appParam, boolean auto) throws Exception {
    final Application application = getById(appParam.getId());
    Utils.notNull(application);
    if (!application.isCanBeStart()) {
      throw new ApiAlertException("[StreamPark] The application cannot be started repeatedly.");
    }

    FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(application.getVersionId());
    if (flinkEnv == null) {
      throw new ApiAlertException("[StreamPark] can no found flink version");
    }

    // if manually started, clear the restart flag
    if (!auto) {
      application.setRestartCount(0);
    } else {
      if (!application.isNeedRestartOnFailed()) {
        return;
      }
      appParam.setSavePointed(true);
      application.setRestartCount(application.getRestartCount() + 1);
    }

    starting(application);
    application.setAllowNonRestored(appParam.getAllowNonRestored());

    String appConf;
    String flinkUserJar = null;
    String jobId = new JobID().toHexString();
    ApplicationLog applicationLog = new ApplicationLog();
    applicationLog.setOptionName(Operation.START.getValue());
    applicationLog.setAppId(application.getId());
    applicationLog.setOptionTime(new Date());

    // set the latest to Effective, (it will only become the current effective at this time)
    this.toEffective(application);

    ApplicationConfig applicationConfig = configService.getEffective(application.getId());
    ExecutionMode executionMode = ExecutionMode.of(application.getExecutionMode());
    ApiAlertException.throwIfNull(
        executionMode, "ExecutionMode can't be null, start application failed.");
    if (application.isCustomCodeJob()) {
      if (application.isUploadJob()) {
        appConf =
            String.format(
                "json://{\"%s\":\"%s\"}",
                ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
      } else {
        switch (application.getApplicationType()) {
          case STREAMPARK_FLINK:
            ConfigFileType fileType = ConfigFileType.of(applicationConfig.getFormat());
            if (fileType != null && !fileType.equals(ConfigFileType.UNKNOWN)) {
              appConf =
                  String.format("%s://%s", fileType.getTypeName(), applicationConfig.getContent());
            } else {
              throw new IllegalArgumentException(
                  "application' config type error,must be ( yaml| properties| hocon )");
            }
            break;
          case APACHE_FLINK:
            appConf =
                String.format(
                    "json://{\"%s\":\"%s\"}",
                    ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
            break;
          default:
            throw new IllegalArgumentException(
                "[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
        }
      }

      if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
        switch (application.getApplicationType()) {
          case STREAMPARK_FLINK:
            flinkUserJar =
                String.format(
                    "%s/%s", application.getAppLib(), application.getModule().concat(".jar"));
            break;
          case APACHE_FLINK:
            flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
            break;
          default:
            throw new IllegalArgumentException(
                "[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
        }
      }
    } else if (application.isFlinkSqlJob()) {
      FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
      Utils.notNull(flinkSql);
      // 1) dist_userJar
      String sqlDistJar = commonService.getSqlClientJar(flinkEnv);
      // 2) appConfig
      appConf =
          applicationConfig == null
              ? null
              : String.format("yaml://%s", applicationConfig.getContent());
      // 3) client
      if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
        String clientPath = Workspace.remote().APP_CLIENT();
        flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
      }
    } else {
      throw new UnsupportedOperationException("Unsupported...");
    }

    Map<String, Object> extraParameter = new HashMap<>(0);
    if (application.isFlinkSqlJob()) {
      FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true);
      // Get the sql of the replaced placeholder
      String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
      flinkSql.setSql(DeflaterUtils.zipString(realSql));
      extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
    }

    KubernetesSubmitParam kubernetesSubmitParam =
        new KubernetesSubmitParam(
            application.getClusterId(),
            application.getK8sNamespace(),
            application.getK8sRestExposedTypeEnum());

    AppBuildPipeline buildPipeline = appBuildPipeService.getById(application.getId());

    Utils.notNull(buildPipeline);

    BuildResult buildResult = buildPipeline.getBuildResult();
    if (ExecutionMode.YARN_APPLICATION.equals(executionMode)) {
      buildResult = new ShadedBuildResponse(null, flinkUserJar, true);
    }

    // Get the args after placeholder replacement
    String applicationArgs =
        variableService.replaceVariable(application.getTeamId(), application.getArgs());

    SubmitRequest submitRequest =
        new SubmitRequest(
            flinkEnv.getFlinkVersion(),
            ExecutionMode.of(application.getExecutionMode()),
            getProperties(application),
            flinkEnv.getFlinkConf(),
            DevelopmentMode.of(application.getJobType()),
            application.getId(),
            jobId,
            application.getJobName(),
            appConf,
            application.getApplicationType(),
            getSavePointed(appParam),
            appParam.getRestoreMode() == null ? null : RestoreMode.of(appParam.getRestoreMode()),
            applicationArgs,
            buildResult,
            kubernetesSubmitParam,
            extraParameter);

    CompletableFuture<SubmitResponse> future =
        CompletableFuture.supplyAsync(() -> FlinkClient.submit(submitRequest), executorService);

    startFutureMap.put(application.getId(), future);

    CompletableFutureUtils.runTimeout(
            future,
            2L,
            TimeUnit.MINUTES,
            submitResponse -> {
              if (submitResponse.flinkConfig() != null) {
                String jmMemory =
                    submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
                if (jmMemory != null) {
                  application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
                }
                String tmMemory =
                    submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_TM_PROCESS_MEMORY());
                if (tmMemory != null) {
                  application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
                }
              }
              application.setAppId(submitResponse.clusterId());
              if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
                application.setJobId(submitResponse.jobId());
              }

              if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
                application.setJobManagerUrl(submitResponse.jobManagerUrl());
                applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
              }
              applicationLog.setYarnAppId(submitResponse.clusterId());
              application.setStartTime(new Date());
              application.setEndTime(null);
              if (isKubernetesApp(application)) {
                application.setRelease(ReleaseState.DONE.get());
              }
              updateById(application);

              // if start completed, will be added task to tracking queue
              if (isKubernetesApp(application)) {
                k8SFlinkTrackMonitor.doWatching(toTrackId(application));
              } else {
                FlinkHttpWatcher.setOptionState(appParam.getId(), OptionState.STARTING);
                FlinkHttpWatcher.doWatching(application);
              }

              applicationLog.setSuccess(true);
              // set savepoint to expire
              savePointService.expire(application.getId());
            },
            e -> {
              if (e.getCause() instanceof CancellationException) {
                updateToStopped(application);
              } else {
                String exception = Utils.stringifyException(e);
                applicationLog.setException(exception);
                applicationLog.setSuccess(false);
                Application app = getById(appParam.getId());
                app.setState(FlinkAppState.FAILED.getValue());
                app.setOptionState(OptionState.NONE.getValue());
                updateById(app);
                if (isKubernetesApp(app)) {
                  k8SFlinkTrackMonitor.unWatching(toTrackId(app));
                } else {
                  FlinkHttpWatcher.unWatching(appParam.getId());
                }
              }
            })
        .whenComplete(
            (t, e) -> {
              if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
                String domainName = settingService.getIngressModeDefault();
                if (StringUtils.isNotBlank(domainName)) {
                  try {
                    IngressController.configureIngress(
                        domainName, application.getClusterId(), application.getK8sNamespace());
                  } catch (KubernetesClientException kubernetesClientException) {
                    log.info(
                        "Failed to create ingress, stack info:{}",
                        kubernetesClientException.getMessage());
                    applicationLog.setException(e.getMessage());
                    applicationLog.setSuccess(false);
                    applicationLogService.save(applicationLog);
                    application.setState(FlinkAppState.FAILED.getValue());
                    application.setOptionState(OptionState.NONE.getValue());
                    updateById(application);
                    return;
                  }
                }
              }

              applicationLogService.save(applicationLog);
              startFutureMap.remove(application.getId());
            });
  }