public String generateJobConfig()

in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java [182:362]


    public String generateJobConfig(
            Long jobId,
            List<JobTask> tasks,
            List<JobLine> lines,
            String envStr,
            JobExecParam executeParam) {
        checkSceneMode(tasks);
        BusinessMode businessMode =
                BusinessMode.valueOf(jobDefinitionDao.getJob(jobId).getJobType());
        Config envConfig = filterEmptyValue(ConfigFactory.parseString(envStr));
        JobUtils.updateDataSource(executeParam, tasks);

        Map<String, List<Config>> sourceMap = new LinkedHashMap<>();
        Map<String, List<Config>> transformMap = new LinkedHashMap<>();
        Map<String, List<Config>> sinkMap = new LinkedHashMap<>();
        Map<String, JobLine> inputLines =
                lines.stream()
                        .collect(
                                Collectors.toMap(
                                        JobLine::getInputPluginId,
                                        Function.identity(),
                                        (existing, replacement) -> existing));
        Map<String, JobLine> targetLines =
                lines.stream()
                        .collect(Collectors.toMap(JobLine::getTargetPluginId, Function.identity()));

        for (JobTask task : tasks) {
            PluginType pluginType = PluginType.valueOf(task.getType().toUpperCase(Locale.ROOT));
            try {
                String pluginId = task.getPluginId();
                OptionRule optionRule =
                        connectorCache.getOptionRule(pluginType.getType(), task.getConnectorType());
                Config config =
                        filterEmptyValue(
                                parseConfigWithOptionRule(
                                        pluginType,
                                        task.getConnectorType(),
                                        task.getConfig(),
                                        optionRule));
                switch (pluginType) {
                    case SOURCE:
                        if (inputLines.containsKey(pluginId)) {
                            config =
                                    addTableName(
                                            CommonOptions.RESULT_TABLE_NAME.key(),
                                            inputLines.get(pluginId),
                                            config);
                            if (!sourceMap.containsKey(task.getConnectorType())) {
                                sourceMap.put(task.getConnectorType(), new ArrayList<>());
                            }

                            if (businessMode.equals(BusinessMode.DATA_REPLICA)) {
                                config =
                                        config.withValue(
                                                DAG_PARSING_MODE,
                                                ConfigValueFactory.fromAnyRef(
                                                        ParsingMode.MULTIPLEX.name()));
                            }

                            if (task.getSceneMode()
                                    .toUpperCase()
                                    .equals(SceneMode.SPLIT_TABLE.name())) {
                                config =
                                        config.withValue(
                                                DAG_PARSING_MODE,
                                                ConfigValueFactory.fromAnyRef(
                                                        ParsingMode.SHARDING.name()));
                            }

                            Config mergeConfig =
                                    mergeTaskConfig(
                                            task,
                                            pluginType,
                                            task.getConnectorType(),
                                            businessMode,
                                            config,
                                            optionRule);
                            sourceMap
                                    .get(task.getConnectorType())
                                    .add(filterEmptyValue(mergeConfig));
                        }
                        break;
                    case TRANSFORM:
                        if (!inputLines.containsKey(pluginId)
                                && !targetLines.containsKey(pluginId)) {
                            break;
                        }
                        if (inputLines.containsKey(pluginId)) {
                            config =
                                    addTableName(
                                            CommonOptions.RESULT_TABLE_NAME.key(),
                                            inputLines.get(pluginId),
                                            config);
                        }
                        if (targetLines.containsKey(pluginId)) {
                            config =
                                    addTableName(
                                            CommonOptions.SOURCE_TABLE_NAME.key(),
                                            targetLines.get(pluginId),
                                            config);
                        }
                        if (!transformMap.containsKey(task.getConnectorType())) {
                            transformMap.put(task.getConnectorType(), new ArrayList<>());
                        }
                        List<TableSchemaReq> inputSchemas = findInputSchemas(tasks, lines, task);
                        Config transformConfig = buildTransformConfig(task, config, inputSchemas);
                        transformMap
                                .get(task.getConnectorType())
                                .add(filterEmptyValue(transformConfig));
                        break;
                    case SINK:
                        if (targetLines.containsKey(pluginId)) {
                            config =
                                    addTableName(
                                            CommonOptions.SOURCE_TABLE_NAME.key(),
                                            targetLines.get(pluginId),
                                            config);
                            if (!sinkMap.containsKey(task.getConnectorType())) {
                                sinkMap.put(task.getConnectorType(), new ArrayList<>());
                            }
                            Config mergeConfig =
                                    mergeTaskConfig(
                                            task,
                                            pluginType,
                                            task.getConnectorType(),
                                            businessMode,
                                            config,
                                            optionRule);

                            sinkMap.get(task.getConnectorType()).add(filterEmptyValue(mergeConfig));
                        }
                        break;
                    default:
                        throw new SeatunnelException(
                                SeatunnelErrorEnum.UNSUPPORTED_CONNECTOR_TYPE,
                                task.getType().toUpperCase());
                }
            } catch (SeatunnelException e) {
                log.error(ExceptionUtils.getMessage(e));
                throw e;
            } catch (Exception e) {
                throw new SeatunnelException(
                        SeatunnelErrorEnum.ERROR_CONFIG,
                        String.format(
                                "Plugin Type: %s, Connector Type: %s, Error Info: %s",
                                pluginType, task.getConnectorType(), ExceptionUtils.getMessage(e)));
            }
        }
        String sources = "";
        if (sourceMap.size() > 0) {
            sources = getConnectorConfig(sourceMap);
        }

        String transforms = "";
        if (transformMap.size() > 0) {
            transforms = getConnectorConfig(transformMap);
        }

        String sinks = "";
        if (sinkMap.size() > 0) {
            sinks = getConnectorConfig(sinkMap);
        }

        if (!encryptionConfig.getType().equals(ENCRYPTION_TYPE_NONE)) {
            envConfig =
                    envConfig.withValue(
                            ENCRYPTION_IDENTIFIER_KEY,
                            ConfigValueFactory.fromAnyRef(encryptionConfig.getType()));
        }

        String env =
                envConfig
                        .root()
                        .render(
                                ConfigRenderOptions.defaults()
                                        .setJson(false)
                                        .setComments(false)
                                        .setOriginComments(false));
        String jobConfig = SeaTunnelConfigUtil.generateConfig(env, sources, transforms, sinks);
        return JobUtils.replaceJobConfigPlaceholders(jobConfig, executeParam);
    }