public static KuduTableInfo createTableInfo()

in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/utils/KuduTableUtils.java [61:96]


    public static KuduTableInfo createTableInfo(
            String tableName, ResolvedSchema schema, Map<String, String> props) {
        KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);

        if (!schema.getPrimaryKey().isPresent()) {
            LOG.info(
                    "Primary key is not defined in the Flink table schema, assuming the table in Kudu already exists.");
            return tableInfo;
        }

        List<Tuple2<String, DataType>> columns =
                schema.getColumns().stream()
                        .map(col -> Tuple2.of(col.getName(), col.getDataType()))
                        .collect(Collectors.toList());

        List<String> primaryKeyCols = schema.getPrimaryKey().get().getColumns();
        ColumnSchemasFactory schemasFactory = () -> toKuduConnectorColumns(columns, primaryKeyCols);
        int replicas =
                Optional.ofNullable(props.get(REPLICAS.key()))
                        .map(Integer::parseInt)
                        .orElse(REPLICAS.defaultValue());
        List<String> hashCols = getHashColumns(props).orElse(primaryKeyCols);
        int hashPartitions =
                Optional.ofNullable(props.get(HASH_PARTITIONS.key()))
                        .map(Integer::parseInt)
                        .orElse(HASH_PARTITIONS.defaultValue());
        CreateTableOptionsFactory optionsFactory =
                () ->
                        new CreateTableOptions()
                                .setNumReplicas(replicas)
                                .addHashPartitions(hashCols, hashPartitions);

        tableInfo.createTableIfNotExists(schemasFactory, optionsFactory);

        return tableInfo;
    }