in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java [143:229]
public static TableDescriptor toFlussTable(ResolvedCatalogTable catalogTable) {
Configuration flinkTableConf = Configuration.fromMap(catalogTable.getOptions());
String connector = flinkTableConf.get(CONNECTOR);
if (!StringUtils.isNullOrWhitespaceOnly(connector)
&& !FlinkCatalogFactory.IDENTIFIER.equals(connector)) {
throw new CatalogException(
"Fluss Catalog only supports fluss tables,"
+ " but you specify 'connector'= '"
+ connector
+ "' when using Fluss Catalog\n"
+ " You can create TEMPORARY table instead if you want to create the table of other connector.");
}
ResolvedSchema resolvedSchema = catalogTable.getResolvedSchema();
// now, build Fluss's table
Schema.Builder schemBuilder = Schema.newBuilder();
if (resolvedSchema.getPrimaryKey().isPresent()) {
schemBuilder.primaryKey(resolvedSchema.getPrimaryKey().get().getColumns());
}
// first build schema with physical columns
Schema schema =
schemBuilder
.fromColumns(
resolvedSchema.getColumns().stream()
.filter(Column::isPhysical)
.map(
column ->
new Schema.Column(
column.getName(),
FlinkConversions.toFlussType(
column.getDataType()),
column.getComment().orElse(null)))
.collect(Collectors.toList()))
.build();
resolvedSchema.getColumns().stream()
.filter(col -> col instanceof Column.MetadataColumn)
.findAny()
.ifPresent(
(col) -> {
throw new CatalogException(
"Metadata column " + col + " is not supported.");
});
Map<String, String> customProperties = flinkTableConf.toMap();
CatalogPropertiesUtils.serializeComputedColumns(
customProperties, resolvedSchema.getColumns());
CatalogPropertiesUtils.serializeWatermarkSpecs(
customProperties, catalogTable.getResolvedSchema().getWatermarkSpecs());
String comment = catalogTable.getComment();
// convert some flink options to fluss table configs.
Map<String, String> properties = convertFlinkOptionsToFlussTableProperties(flinkTableConf);
// then set distributed by information
List<String> bucketKey;
if (flinkTableConf.containsKey(BUCKET_KEY.key())) {
bucketKey =
Arrays.stream(flinkTableConf.get(BUCKET_KEY).split(","))
.map(String::trim)
.collect(Collectors.toList());
} else {
// use primary keys - partition keys
bucketKey =
schema.getPrimaryKey()
.map(
pk -> {
List<String> bucketKeys =
new ArrayList<>(pk.getColumnNames());
bucketKeys.removeAll(catalogTable.getPartitionKeys());
return bucketKeys;
})
.orElse(Collections.emptyList());
}
Integer bucketNum = flinkTableConf.getOptional(BUCKET_NUMBER).orElse(null);
return TableDescriptor.builder()
.schema(schema)
.partitionedBy(catalogTable.getPartitionKeys())
.distributedBy(bucketNum, bucketKey)
.comment(comment)
.properties(properties)
.customProperties(customProperties)
.build();
}