in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/utils/KuduTableUtils.java [61:96]
public static KuduTableInfo createTableInfo(
String tableName, ResolvedSchema schema, Map<String, String> props) {
KuduTableInfo tableInfo = KuduTableInfo.forTable(tableName);
if (!schema.getPrimaryKey().isPresent()) {
LOG.info(
"Primary key is not defined in the Flink table schema, assuming the table in Kudu already exists.");
return tableInfo;
}
List<Tuple2<String, DataType>> columns =
schema.getColumns().stream()
.map(col -> Tuple2.of(col.getName(), col.getDataType()))
.collect(Collectors.toList());
List<String> primaryKeyCols = schema.getPrimaryKey().get().getColumns();
ColumnSchemasFactory schemasFactory = () -> toKuduConnectorColumns(columns, primaryKeyCols);
int replicas =
Optional.ofNullable(props.get(REPLICAS.key()))
.map(Integer::parseInt)
.orElse(REPLICAS.defaultValue());
List<String> hashCols = getHashColumns(props).orElse(primaryKeyCols);
int hashPartitions =
Optional.ofNullable(props.get(HASH_PARTITIONS.key()))
.map(Integer::parseInt)
.orElse(HASH_PARTITIONS.defaultValue());
CreateTableOptionsFactory optionsFactory =
() ->
new CreateTableOptions()
.setNumReplicas(replicas)
.addHashPartitions(hashCols, hashPartitions);
tableInfo.createTableIfNotExists(schemasFactory, optionsFactory);
return tableInfo;
}