public static Schema strictlyMergeSchemas()

in flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java [316:398]


    public static Schema strictlyMergeSchemas(List<Schema> schemas) {
        Preconditions.checkArgument(
                !schemas.isEmpty(), "Trying to merge transformed schemas %s, but got empty list");
        if (schemas.size() == 1) {
            return schemas.get(0);
        }

        List<List<String>> primaryKeys =
                schemas.stream()
                        .map(Schema::primaryKeys)
                        .filter(p -> !p.isEmpty())
                        .distinct()
                        .collect(Collectors.toList());
        List<List<String>> partitionKeys =
                schemas.stream()
                        .map(Schema::partitionKeys)
                        .filter(p -> !p.isEmpty())
                        .distinct()
                        .collect(Collectors.toList());
        List<Map<String, String>> options =
                schemas.stream()
                        .map(Schema::options)
                        .filter(p -> !p.isEmpty())
                        .distinct()
                        .collect(Collectors.toList());
        List<List<String>> columnNames =
                schemas.stream()
                        .map(Schema::getColumnNames)
                        .distinct()
                        .collect(Collectors.toList());

        Preconditions.checkArgument(
                primaryKeys.size() <= 1,
                "Trying to merge transformed schemas %s, but got more than one primary key configurations: %s",
                schemas,
                primaryKeys);
        Preconditions.checkArgument(
                partitionKeys.size() <= 1,
                "Trying to merge transformed schemas %s, but got more than one partition key configurations: %s",
                schemas,
                partitionKeys);
        Preconditions.checkArgument(
                options.size() <= 1,
                "Trying to merge transformed schemas %s, but got more than one option configurations: %s",
                schemas,
                options);
        Preconditions.checkArgument(
                columnNames.size() == 1,
                "Trying to merge transformed schemas %s, but got more than one column name views: %s",
                schemas,
                columnNames);

        int arity = columnNames.get(0).size();

        ArrayListMultimap<Integer, DataType> toBeMergedColumnTypes =
                ArrayListMultimap.create(arity, 1);
        for (Schema schema : schemas) {
            List<DataType> columnTypes = schema.getColumnDataTypes();
            for (int colIndex = 0; colIndex < columnTypes.size(); colIndex++) {
                toBeMergedColumnTypes.put(colIndex, columnTypes.get(colIndex));
            }
        }

        List<String> mergedColumnNames = columnNames.iterator().next();
        List<DataType> mergedColumnTypes = new ArrayList<>(arity);
        for (int i = 0; i < arity; i++) {
            mergedColumnTypes.add(strictlyMergeDataTypes(toBeMergedColumnTypes.get(i)));
        }

        List<Column> mergedColumns = new ArrayList<>();
        for (int i = 0; i < mergedColumnNames.size(); i++) {
            mergedColumns.add(
                    Column.physicalColumn(mergedColumnNames.get(i), mergedColumnTypes.get(i)));
        }

        return Schema.newBuilder()
                .primaryKey(primaryKeys.isEmpty() ? Collections.emptyList() : primaryKeys.get(0))
                .partitionKey(
                        partitionKeys.isEmpty() ? Collections.emptyList() : partitionKeys.get(0))
                .options(options.isEmpty() ? Collections.emptyMap() : options.get(0))
                .setColumns(mergedColumns)
                .build();
    }