private Tuple2 getUserJarAndAppConf()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/SparkApplicationActionServiceImpl.java [442:546]


    private Tuple2<String, String> getUserJarAndAppConf(
                                                        SparkEnv sparkEnv, SparkApplication application) {
        SparkDeployMode deployModeEnum = application.getDeployModeEnum();
        SparkApplicationConfig applicationConfig = configService.getEffective(application.getId());

        ApiAlertException.throwIfNull(
            deployModeEnum, "DeployMode can't be null, start application failed.");

        String sparkUserJar = null;
        String appConf = null;

        switch (application.getJobTypeEnum()) {
            case SPARK_SQL:
                SparkSql sparkSql = sparkSqlService.getEffective(application.getId(), false);
                AssertUtils.notNull(sparkSql);
                // 1) dist_userJar
                String sqlDistJar = ServiceHelper.getSparkSqlClientJar(sparkEnv);
                // 2) appConfig
                appConf = applicationConfig == null
                    ? null
                    : String.format("yaml://%s", applicationConfig.getContent());
                // 3) client
                if (SparkDeployMode.YARN_CLUSTER == deployModeEnum) {
                    String clientPath = Workspace.remote().APP_CLIENT();
                    sparkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
                }
                break;

            case PYSPARK:
                appConf = applicationConfig == null
                    ? null
                    : String.format("yaml://%s", applicationConfig.getContent());
                Resource resource = resourceService.findByResourceName(application.getTeamId(), application.getJar());

                ApiAlertException.throwIfNull(
                    resource, "pyflink file can't be null, start application failed.");

                ApiAlertException.throwIfNull(
                    resource.getFilePath(), "pyflink file can't be null, start application failed.");

                ApiAlertException.throwIfFalse(
                    resource.getFilePath().endsWith(Constants.PYTHON_SUFFIX),
                    "pyflink format error, must be a \".py\" suffix, start application failed.");

                sparkUserJar = resource.getFilePath();
                break;

            case SPARK_JAR:
                if (application.isFromUploadJob()) {
                    appConf = applicationConfig == null
                        ? null
                        : String.format("yaml://%s", applicationConfig.getContent());
                } else {
                    switch (application.getApplicationType()) {
                        case STREAMPARK_SPARK:
                            ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(applicationConfig.getFormat());
                            if (fileType != null && ConfigFileTypeEnum.UNKNOWN != fileType) {
                                appConf = String.format(
                                    "%s://%s", fileType.getTypeName(), applicationConfig.getContent());
                            } else {
                                throw new IllegalArgumentException(
                                    "application' config type error,must be ( yaml| properties| hocon )");
                            }
                            break;
                        case APACHE_SPARK:
                            appConf = String.format(
                                "json://{\"%s\":\"%s\"}",
                                ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
                            break;
                        default:
                            throw new IllegalArgumentException(
                                "[StreamPark] ApplicationType must be (StreamPark spark | Apache spark)... ");
                    }
                }

                if (SparkDeployMode.isYarnMode(deployModeEnum)) {
                    switch (application.getApplicationType()) {
                        case STREAMPARK_SPARK:
                            sparkUserJar = String.format(
                                "%s/%s",
                                application.getAppLib(),
                                application.getModule().concat(Constants.JAR_SUFFIX));
                            break;
                        case APACHE_SPARK:
                            sparkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
                            if (!FsOperator.hdfs().exists(sparkUserJar)) {
                                resource = resourceService.findByResourceName(
                                    application.getTeamId(), application.getJar());
                                if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
                                    sparkUserJar = String.format(
                                        "%s/%s",
                                        application.getAppHome(),
                                        new File(resource.getFilePath()).getName());
                                }
                            }
                            break;
                        default:
                            throw new IllegalArgumentException(
                                "[StreamPark] ApplicationType must be (StreamPark spark | Apache spark)... ");
                    }
                }
                break;
        }
        return Tuple2.of(sparkUserJar, appConf);
    }