private JobTaskCheckRes checkNextTaskSchema()

in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java [316:409]


    private JobTaskCheckRes checkNextTaskSchema(
            PluginConfig config,
            PluginConfig nextConfig,
            Map<String, PluginConfig> pluginMap,
            Map<String, List<String>> edgeMap)
            throws IOException {
        Map<String, Object> options = nextConfig.getTransformOptions();
        if (options != null && !options.isEmpty()) {
            Transform transform = Transform.valueOf(nextConfig.getConnectorType().toUpperCase());
            String transformOptionsStr = JsonUtils.toJsonString(options);

            List<TransformOption> transformOptions = new ArrayList<>();

            switch (transform) {
                case FIELDMAPPER:
                    FieldMapperTransformOptions fieldMapperTransformOptions =
                            getTransformOption(transform, transformOptionsStr);
                    if (fieldMapperTransformOptions != null) {
                        fillTransformOptions(
                                transformOptions, fieldMapperTransformOptions.getRenameFields());
                        fillTransformOptions(
                                transformOptions, fieldMapperTransformOptions.getChangeOrders());
                    }
                    break;
                case MULTIFIELDSPLIT:
                    SplitTransformOptions splitTransformOptions =
                            getTransformOption(transform, transformOptionsStr);
                    if (splitTransformOptions != null) {
                        fillTransformOptions(transformOptions, splitTransformOptions.getSplits());
                    }
                    break;
                case COPY:
                    CopyTransformOptions copyTransformOptions =
                            getTransformOption(transform, transformOptionsStr);
                    if (copyTransformOptions != null) {
                        fillTransformOptions(transformOptions, copyTransformOptions.getCopyList());
                    }
                    break;
                case SQL:
                    SQLTransformOptions sqlTransformOptions =
                            getTransformOption(transform, transformOptionsStr);
                    if (sqlTransformOptions != null) {
                        fillTransformOptions(
                                transformOptions,
                                Collections.singletonList(sqlTransformOptions.getSql()));
                    }
                    break;
                case JSONPATH:
                    break;
                case FILTERROWKIND:
                case REPLACE:
                default:
                    throw new SeatunnelException(
                            SeatunnelErrorEnum.UNSUPPORTED_CONNECTOR_TYPE,
                            "Unsupported Transform Option " + transform);
            }

            if (!transformOptions.isEmpty()) {
                DatabaseTableSchemaReq databaseTableSchemaReq = config.getOutputSchema().get(0);
                List<String> fields =
                        databaseTableSchemaReq.getFields().stream()
                                .map(TableField::getName)
                                .collect(Collectors.toList());
                Optional<TransformOption> transformOption =
                        transformOptions.stream()
                                .filter(option -> !fields.contains(option.getSourceFieldName()))
                                .findFirst();
                if (transformOption.isPresent()) {
                    String sourceFiledName = transformOption.get().getSourceFieldName();
                    return new JobTaskCheckRes(
                            false,
                            nextConfig.getPluginId(),
                            new SchemaError(
                                    databaseTableSchemaReq.getDatabase(),
                                    databaseTableSchemaReq.getTableName(),
                                    sourceFiledName,
                                    SchemaErrorType.MISS_FIELD),
                            null);
                }
            }
        }
        List<String> nextConfigIds = edgeMap.get(nextConfig.getPluginId());
        if (nextConfigIds != null) {
            for (String nextConfigId : nextConfigIds) {
                JobTaskCheckRes res =
                        checkNextTaskSchema(
                                nextConfig, pluginMap.get(nextConfigId), pluginMap, edgeMap);
                if (res != null) {
                    return res;
                }
            }
        }
        return null;
    }