public static TableDescriptor toFlussTable()

in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java [143:229]


    public static TableDescriptor toFlussTable(ResolvedCatalogTable catalogTable) {
        Configuration flinkTableConf = Configuration.fromMap(catalogTable.getOptions());
        String connector = flinkTableConf.get(CONNECTOR);
        if (!StringUtils.isNullOrWhitespaceOnly(connector)
                && !FlinkCatalogFactory.IDENTIFIER.equals(connector)) {
            throw new CatalogException(
                    "Fluss Catalog only supports fluss tables,"
                            + " but you specify  'connector'= '"
                            + connector
                            + "' when using Fluss Catalog\n"
                            + " You can create TEMPORARY table instead if you want to create the table of other connector.");
        }

        ResolvedSchema resolvedSchema = catalogTable.getResolvedSchema();

        // now, build Fluss's table
        Schema.Builder schemBuilder = Schema.newBuilder();
        if (resolvedSchema.getPrimaryKey().isPresent()) {
            schemBuilder.primaryKey(resolvedSchema.getPrimaryKey().get().getColumns());
        }

        // first build schema with physical columns
        Schema schema =
                schemBuilder
                        .fromColumns(
                                resolvedSchema.getColumns().stream()
                                        .filter(Column::isPhysical)
                                        .map(
                                                column ->
                                                        new Schema.Column(
                                                                column.getName(),
                                                                FlinkConversions.toFlussType(
                                                                        column.getDataType()),
                                                                column.getComment().orElse(null)))
                                        .collect(Collectors.toList()))
                        .build();
        resolvedSchema.getColumns().stream()
                .filter(col -> col instanceof Column.MetadataColumn)
                .findAny()
                .ifPresent(
                        (col) -> {
                            throw new CatalogException(
                                    "Metadata column " + col + " is not supported.");
                        });

        Map<String, String> customProperties = flinkTableConf.toMap();
        CatalogPropertiesUtils.serializeComputedColumns(
                customProperties, resolvedSchema.getColumns());
        CatalogPropertiesUtils.serializeWatermarkSpecs(
                customProperties, catalogTable.getResolvedSchema().getWatermarkSpecs());

        String comment = catalogTable.getComment();

        // convert some flink options to fluss table configs.
        Map<String, String> properties = convertFlinkOptionsToFlussTableProperties(flinkTableConf);

        // then set distributed by information
        List<String> bucketKey;
        if (flinkTableConf.containsKey(BUCKET_KEY.key())) {
            bucketKey =
                    Arrays.stream(flinkTableConf.get(BUCKET_KEY).split(","))
                            .map(String::trim)
                            .collect(Collectors.toList());
        } else {
            // use primary keys - partition keys
            bucketKey =
                    schema.getPrimaryKey()
                            .map(
                                    pk -> {
                                        List<String> bucketKeys =
                                                new ArrayList<>(pk.getColumnNames());
                                        bucketKeys.removeAll(catalogTable.getPartitionKeys());
                                        return bucketKeys;
                                    })
                            .orElse(Collections.emptyList());
        }
        Integer bucketNum = flinkTableConf.getOptional(BUCKET_NUMBER).orElse(null);

        return TableDescriptor.builder()
                .schema(schema)
                .partitionedBy(catalogTable.getPartitionKeys())
                .distributedBy(bucketNum, bucketKey)
                .comment(comment)
                .properties(properties)
                .customProperties(customProperties)
                .build();
    }