private Tuple2 getUserJarAndAppConf()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationActionServiceImpl.java [664:790]


    private Tuple2<String, String> getUserJarAndAppConf(
                                                        FlinkEnv flinkEnv, FlinkApplication application) {
        FlinkDeployMode deployModeEnum = application.getDeployModeEnum();
        FlinkApplicationConfig applicationConfig = configService.getEffective(application.getId());

        ApiAlertException.throwIfNull(
            deployModeEnum, "DeployMode can't be null, start application failed.");

        String flinkUserJar = null;
        String appConf = null;

        switch (application.getJobTypeEnum()) {
            case FLINK_SQL:
                FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), false);
                AssertUtils.notNull(flinkSql);
                // 1) dist_userJar
                String sqlDistJar = ServiceHelper.getFlinkSqlClientJar(flinkEnv);
                // 2) appConfig
                appConf =
                    applicationConfig == null
                        ? null
                        : String.format("yaml://%s", applicationConfig.getContent());
                // 3) client
                if (FlinkDeployMode.YARN_APPLICATION == deployModeEnum) {
                    String clientPath = Workspace.remote().APP_CLIENT();
                    flinkUserJar = String.format("%s/%s", clientPath, sqlDistJar);
                }
                break;

            case FLINK_CDC:
                log.info("the current job id: {}", application.getId());
                FlinkSql flinkCDC = flinkSqlService.getEffective(application.getId(), false);
                AssertUtils.notNull(flinkCDC);
                // 1) dist_userJar
                String cdcDistJar = ServiceHelper.getFlinkCDCClientJar(flinkEnv);
                // 2) appConfig
                appConf =
                    applicationConfig == null
                        ? null
                        : String.format("yaml://%s", applicationConfig.getContent());
                // 3) client
                if (FlinkDeployMode.YARN_APPLICATION == deployModeEnum) {
                    String clientPath = Workspace.remote().APP_CLIENT();
                    flinkUserJar = String.format("%s/%s", clientPath, cdcDistJar);
                }
                break;

            case PYFLINK:
                Resource resource =
                    resourceService.findByResourceName(application.getTeamId(), application.getJar());

                ApiAlertException.throwIfNull(
                    resource, "pyflink file can't be null, start application failed.");

                ApiAlertException.throwIfNull(
                    resource.getFilePath(), "pyflink file can't be null, start application failed.");

                ApiAlertException.throwIfFalse(
                    resource.getFilePath().endsWith(Constants.PYTHON_SUFFIX),
                    "pyflink format error, must be a \".py\" suffix, start application failed.");

                flinkUserJar = resource.getFilePath();
                break;

            case FLINK_JAR:
                if (application.isResourceFromUpload()) {
                    appConf =
                        String.format(
                            "json://{\"%s\":\"%s\"}",
                            ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
                } else {
                    switch (application.getApplicationType()) {
                        case STREAMPARK_FLINK:
                            ConfigFileTypeEnum fileType = ConfigFileTypeEnum.of(applicationConfig.getFormat());
                            if (fileType != null && ConfigFileTypeEnum.UNKNOWN != fileType) {
                                appConf =
                                    String.format(
                                        "%s://%s", fileType.getTypeName(), applicationConfig.getContent());
                            } else {
                                throw new IllegalArgumentException(
                                    "application' config type error,must be ( yaml| properties| hocon )");
                            }
                            break;
                        case APACHE_FLINK:
                            appConf =
                                String.format(
                                    "json://{\"%s\":\"%s\"}",
                                    ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
                            break;
                        default:
                            throw new IllegalArgumentException(
                                "[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
                    }
                }

                if (FlinkDeployMode.YARN_APPLICATION == deployModeEnum) {
                    switch (application.getApplicationType()) {
                        case STREAMPARK_FLINK:
                            flinkUserJar =
                                String.format(
                                    "%s/%s",
                                    application.getAppLib(),
                                    application.getModule().concat(Constants.JAR_SUFFIX));
                            break;
                        case APACHE_FLINK:
                            flinkUserJar = String.format("%s/%s", application.getAppHome(), application.getJar());
                            if (!FsOperator.hdfs().exists(flinkUserJar)) {
                                resource =
                                    resourceService.findByResourceName(
                                        application.getTeamId(), application.getJar());
                                if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
                                    flinkUserJar =
                                        String.format(
                                            "%s/%s",
                                            application.getAppHome(), new File(resource.getFilePath()).getName());
                                }
                            }
                            break;
                        default:
                            throw new IllegalArgumentException(
                                "[StreamPark] ApplicationType must be (StreamPark flink | Apache flink)... ");
                    }
                }
                break;
        }
        return Tuple2.of(flinkUserJar, appConf);
    }