public static KuduTableInfo createTableInfo()

in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java [55:83]


    public static KuduTableInfo createTableInfo(String tableName, TableSchema schema, Map<String, String> props) {
        // Since KUDU_HASH_COLS is a required property for table creation, we use it to infer whether to create table
        boolean createIfMissing = props.containsKey(KUDU_PRIMARY_KEY_COLS.key()) || schema.getPrimaryKey().isPresent();
        KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);

        if (createIfMissing) {

            List<Tuple2<String, DataType>> columns = getSchemaWithSqlTimestamp(schema)
                    .getTableColumns()
                    .stream()
                    .map(tc -> Tuple2.of(tc.getName(), tc.getType()))
                    .collect(Collectors.toList());

            List<String> keyColumns = getPrimaryKeyColumns(props, schema);
            ColumnSchemasFactory schemasFactory = () -> toKuduConnectorColumns(columns, keyColumns);
            int replicas = Optional.ofNullable(props.get(KUDU_REPLICAS.key())).map(Integer::parseInt).orElse(1);
            // if hash partitions nums not exists,default 3;
            int hashPartitionNums =
                    Optional.ofNullable(props.get(KUDU_HASH_PARTITION_NUMS.key())).map(Integer::parseInt).orElse(3);
            CreateTableOptionsFactory optionsFactory = () -> new CreateTableOptions()
                    .setNumReplicas(replicas)
                    .addHashPartitions(getHashColumns(props), hashPartitionNums);
            tableInfo.createTableIfNotExists(schemasFactory, optionsFactory);
        } else {
            LOG.debug("Property {} is missing, assuming the table is already created.", KUDU_HASH_COLS.key());
        }

        return tableInfo;
    }