in flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java [316:398]
public static Schema strictlyMergeSchemas(List<Schema> schemas) {
Preconditions.checkArgument(
!schemas.isEmpty(), "Trying to merge transformed schemas %s, but got empty list");
if (schemas.size() == 1) {
return schemas.get(0);
}
List<List<String>> primaryKeys =
schemas.stream()
.map(Schema::primaryKeys)
.filter(p -> !p.isEmpty())
.distinct()
.collect(Collectors.toList());
List<List<String>> partitionKeys =
schemas.stream()
.map(Schema::partitionKeys)
.filter(p -> !p.isEmpty())
.distinct()
.collect(Collectors.toList());
List<Map<String, String>> options =
schemas.stream()
.map(Schema::options)
.filter(p -> !p.isEmpty())
.distinct()
.collect(Collectors.toList());
List<List<String>> columnNames =
schemas.stream()
.map(Schema::getColumnNames)
.distinct()
.collect(Collectors.toList());
Preconditions.checkArgument(
primaryKeys.size() <= 1,
"Trying to merge transformed schemas %s, but got more than one primary key configurations: %s",
schemas,
primaryKeys);
Preconditions.checkArgument(
partitionKeys.size() <= 1,
"Trying to merge transformed schemas %s, but got more than one partition key configurations: %s",
schemas,
partitionKeys);
Preconditions.checkArgument(
options.size() <= 1,
"Trying to merge transformed schemas %s, but got more than one option configurations: %s",
schemas,
options);
Preconditions.checkArgument(
columnNames.size() == 1,
"Trying to merge transformed schemas %s, but got more than one column name views: %s",
schemas,
columnNames);
int arity = columnNames.get(0).size();
ArrayListMultimap<Integer, DataType> toBeMergedColumnTypes =
ArrayListMultimap.create(arity, 1);
for (Schema schema : schemas) {
List<DataType> columnTypes = schema.getColumnDataTypes();
for (int colIndex = 0; colIndex < columnTypes.size(); colIndex++) {
toBeMergedColumnTypes.put(colIndex, columnTypes.get(colIndex));
}
}
List<String> mergedColumnNames = columnNames.iterator().next();
List<DataType> mergedColumnTypes = new ArrayList<>(arity);
for (int i = 0; i < arity; i++) {
mergedColumnTypes.add(strictlyMergeDataTypes(toBeMergedColumnTypes.get(i)));
}
List<Column> mergedColumns = new ArrayList<>();
for (int i = 0; i < mergedColumnNames.size(); i++) {
mergedColumns.add(
Column.physicalColumn(mergedColumnNames.get(i), mergedColumnTypes.get(i)));
}
return Schema.newBuilder()
.primaryKey(primaryKeys.isEmpty() ? Collections.emptyList() : primaryKeys.get(0))
.partitionKey(
partitionKeys.isEmpty() ? Collections.emptyList() : partitionKeys.get(0))
.options(options.isEmpty() ? Collections.emptyMap() : options.get(0))
.setColumns(mergedColumns)
.build();
}