private List populateSparkOptions()

in dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java [125:229]


    private List<String> populateSparkOptions() {
        List<String> args = new ArrayList<>();

        // see https://spark.apache.org/docs/latest/submitting-applications.html
        // TODO remove the option 'local' from deploy-mode
        String deployMode = StringUtils.isNotEmpty(sparkParameters.getDeployMode()) ? sparkParameters.getDeployMode()
                : SparkConstants.DEPLOY_MODE_LOCAL;

        boolean onLocal = SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode);
        boolean onNativeKubernetes = StringUtils.isNotEmpty(sparkParameters.getNamespace());

        String masterUrl = StringUtils.isNotEmpty(sparkParameters.getMaster()) ? sparkParameters.getMaster()
                : onLocal ? deployMode
                        : onNativeKubernetes
                                ? SPARK_ON_K8S_MASTER_PREFIX + Config
                                        .fromKubeconfig(
                                                taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml())
                                        .getMasterUrl()
                                : SparkConstants.SPARK_ON_YARN;

        args.add(SparkConstants.MASTER);
        args.add(masterUrl);

        if (!onLocal) {
            args.add(SparkConstants.DEPLOY_MODE);
            args.add(deployMode);
        }

        ProgramType programType = sparkParameters.getProgramType();
        String mainClass = sparkParameters.getMainClass();
        if (programType != ProgramType.PYTHON && programType != ProgramType.SQL && StringUtils.isNotEmpty(mainClass)) {
            args.add(SparkConstants.MAIN_CLASS);
            args.add(mainClass);
        }

        populateSparkResourceDefinitions(args);

        String appName = sparkParameters.getAppName();
        if (StringUtils.isNotEmpty(appName)) {
            args.add(SparkConstants.SPARK_NAME);
            args.add(ArgsUtils.escape(appName));
        }

        String others = sparkParameters.getOthers();
        if (!SparkConstants.DEPLOY_MODE_LOCAL.equals(deployMode)
                && (StringUtils.isEmpty(others) || !others.contains(SparkConstants.SPARK_YARN_QUEUE))) {
            String yarnQueue = sparkParameters.getYarnQueue();
            if (StringUtils.isNotEmpty(yarnQueue)) {
                args.add(SparkConstants.SPARK_YARN_QUEUE);
                args.add(yarnQueue);
            }
        }

        // --conf --files --jars --packages
        if (StringUtils.isNotEmpty(others)) {
            args.add(others);
        }

        // add driver label for spark on native kubernetes
        if (onNativeKubernetes) {
            args.add(String.format(DRIVER_LABEL_CONF, UNIQUE_LABEL_NAME, taskExecutionContext.getTaskAppId()));
            args.add(String.format(SPARK_KUBERNETES_NAMESPACE,
                    JSONUtils.toMap(sparkParameters.getNamespace()).get(NAMESPACE_NAME)));
        }

        ResourceInfo mainJar = sparkParameters.getMainJar();
        if (programType != ProgramType.SQL) {
            ResourceContext resourceContext = taskExecutionContext.getResourceContext();
            args.add(resourceContext.getResourceItem(mainJar.getResourceName()).getResourceAbsolutePathInLocal());
        }

        String mainArgs = sparkParameters.getMainArgs();
        if (programType != ProgramType.SQL && StringUtils.isNotEmpty(mainArgs)) {
            args.add(mainArgs);
        }

        // bin/spark-sql -f fileName
        if (ProgramType.SQL == programType) {
            String sqlContent = "";
            String resourceFileName = "";
            args.add(SparkConstants.SQL_FROM_FILE);
            if (SparkConstants.TYPE_FILE.equals(sparkParameters.getSqlExecutionType())) {
                final List<ResourceInfo> resourceInfos = sparkParameters.getResourceList();
                if (resourceInfos.size() > 1) {
                    log.warn("more than 1 files detected, use the first one by default");
                }

                try {
                    resourceFileName = resourceInfos.get(0).getResourceName();
                    ResourceContext resourceContext = taskExecutionContext.getResourceContext();
                    sqlContent = FileUtils.readFileToString(
                            new File(
                                    resourceContext.getResourceItem(resourceFileName).getResourceAbsolutePathInLocal()),
                            StandardCharsets.UTF_8);
                } catch (IOException e) {
                    log.error("read sql content from file {} error ", resourceFileName, e);
                    throw new TaskException("read sql content error", e);
                }
            } else {
                sqlContent = sparkParameters.getRawScript();
            }
            args.add(generateScriptFile(sqlContent));
        }
        return args;
    }