in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/catalog/KuduCatalog.java [242:265]
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException {
checkNotNull(tablePath, "Table path must be provided.");
checkNotNull(table, "Table must be provided.");
checkArgument(table instanceof ResolvedCatalogBaseTable, "Table must be resolved.");
Map<String, String> tableProperties = table.getOptions();
ResolvedSchema schema = ((ResolvedCatalogBaseTable<?>) table).getResolvedSchema();
Set<String> optionalProperties =
ImmutableSet.of(REPLICAS.key(), HASH_PARTITIONS.key(), HASH_COLS.key());
if (!optionalProperties.containsAll(tableProperties.keySet())) {
throw new CatalogException(
"Unknown properties were given. The following properties are allowed:"
+ optionalProperties);
}
String tableName = tablePath.getObjectName();
KuduTableInfo tableInfo =
KuduTableUtils.createTableInfo(tableName, schema, tableProperties);
createTable(tableInfo, ignoreIfExists);
}