in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/utils/KuduTableUtils.java [55:83]
public static KuduTableInfo createTableInfo(String tableName, TableSchema schema, Map<String, String> props) {
// Since KUDU_HASH_COLS is a required property for table creation, we use it to infer whether to create table
boolean createIfMissing = props.containsKey(KUDU_PRIMARY_KEY_COLS.key()) || schema.getPrimaryKey().isPresent();
KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
if (createIfMissing) {
List<Tuple2<String, DataType>> columns = getSchemaWithSqlTimestamp(schema)
.getTableColumns()
.stream()
.map(tc -> Tuple2.of(tc.getName(), tc.getType()))
.collect(Collectors.toList());
List<String> keyColumns = getPrimaryKeyColumns(props, schema);
ColumnSchemasFactory schemasFactory = () -> toKuduConnectorColumns(columns, keyColumns);
int replicas = Optional.ofNullable(props.get(KUDU_REPLICAS.key())).map(Integer::parseInt).orElse(1);
// if hash partitions nums not exists,default 3;
int hashPartitionNums =
Optional.ofNullable(props.get(KUDU_HASH_PARTITION_NUMS.key())).map(Integer::parseInt).orElse(3);
CreateTableOptionsFactory optionsFactory = () -> new CreateTableOptions()
.setNumReplicas(replicas)
.addHashPartitions(getHashColumns(props), hashPartitionNums);
tableInfo.createTableIfNotExists(schemasFactory, optionsFactory);
} else {
LOG.debug("Property {} is missing, assuming the table is already created.", KUDU_HASH_COLS.key());
}
return tableInfo;
}