in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/catalog/KuduDynamicCatalog.java [218:246]
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException {
Map<String, String> tableProperties = table.getOptions();
TableSchema tableSchema = table.getSchema();
Set<String> optionalProperties = new HashSet<>(Arrays.asList(KUDU_REPLICAS.key(),
KUDU_HASH_PARTITION_NUMS.key(), KUDU_HASH_COLS.key()));
Set<String> requiredProperties = new HashSet<>();
if (!tableSchema.getPrimaryKey().isPresent()) {
requiredProperties.add(KUDU_PRIMARY_KEY_COLS.key());
}
if (!tableProperties.keySet().containsAll(requiredProperties)) {
throw new CatalogException("Missing required property. The following properties must be provided: " +
requiredProperties.toString());
}
Set<String> permittedProperties = Sets.union(requiredProperties, optionalProperties);
if (!permittedProperties.containsAll(tableProperties.keySet())) {
throw new CatalogException("Unpermitted properties were given. The following properties are allowed:" +
permittedProperties.toString());
}
String tableName = tablePath.getObjectName();
KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, tableSchema, tableProperties);
createTable(tableInfo, ignoreIfExists);
}