private void checkConfigIntegrity()

in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java [107:187]


    private void checkConfigIntegrity(JobVersion version, JobTaskInfo jobTaskInfo) {
        if (StringUtils.isEmpty(version.getEnv())) {
            throw new SeatunnelException(
                    SeatunnelErrorEnum.ERROR_CONFIG,
                    "job env can't be empty, please change config");
        }
        Map<String, PluginConfig> pluginConfigMap =
                jobTaskInfo.getPlugins().stream()
                        .collect(Collectors.toMap(PluginConfig::getPluginId, Function.identity()));

        List<String> allPluginIdsFromEdge =
                Stream.concat(
                                jobTaskInfo.getEdges().stream().map(Edge::getInputPluginId),
                                jobTaskInfo.getEdges().stream().map(Edge::getTargetPluginId))
                        .collect(Collectors.toList());

        jobTaskInfo.getPlugins().stream()
                .filter(p -> !allPluginIdsFromEdge.contains(p.getPluginId()))
                .findAny()
                .ifPresent(
                        p -> {
                            throw new SeatunnelException(
                                    SeatunnelErrorEnum.ERROR_CONFIG,
                                    "plugin '" + p.getName() + "' is not used in any edge");
                        });

        jobTaskInfo
                .getEdges()
                .forEach(
                        e -> {
                            jobTaskInfo.getPlugins().stream()
                                    .filter(
                                            p ->
                                                    Objects.equals(
                                                            p.getPluginId(), e.getInputPluginId()))
                                    .findFirst()
                                    .orElseThrow(
                                            () ->
                                                    new SeatunnelException(
                                                            SeatunnelErrorEnum.ERROR_CONFIG,
                                                            "input plugin not found"));
                            jobTaskInfo.getPlugins().stream()
                                    .filter(
                                            p ->
                                                    Objects.equals(
                                                            p.getPluginId(), e.getTargetPluginId()))
                                    .findFirst()
                                    .orElseThrow(
                                            () ->
                                                    new SeatunnelException(
                                                            SeatunnelErrorEnum.ERROR_CONFIG,
                                                            "target plugin not found"));
                        });

        List<String> inputTransformId =
                jobTaskInfo.getEdges().stream()
                        .map(Edge::getInputPluginId)
                        .filter(e -> pluginConfigMap.get(e).getType().equals(PluginType.TRANSFORM))
                        .collect(Collectors.toList());
        List<String> targetTransformId =
                jobTaskInfo.getEdges().stream()
                        .map(Edge::getTargetPluginId)
                        .filter(e -> pluginConfigMap.get(e).getType().equals(PluginType.TRANSFORM))
                        .collect(Collectors.toList());

        if (!new HashSet<>(inputTransformId).containsAll(targetTransformId)
                || !new HashSet<>(targetTransformId).containsAll(inputTransformId)) {
            throw new SeatunnelException(
                    SeatunnelErrorEnum.ERROR_CONFIG, "transform plugin must be connected");
        }

        for (Edge edge : jobTaskInfo.getEdges()) {
            if (!pluginTypeMatch(
                    pluginConfigMap.get(edge.getInputPluginId()).getType(),
                    pluginConfigMap.get(edge.getTargetPluginId()).getType())) {
                throw new SeatunnelException(
                        SeatunnelErrorEnum.ERROR_CONFIG,
                        "plugin line not match, please check plugin line");
            }
        }
    }