in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/KuduCatalog.java [221:248]
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException {
Map<String, String> tableProperties = table.getOptions();
TableSchema tableSchema = table.getSchema();
Set<String> optionalProperties = new HashSet<>(Arrays.asList(KUDU_REPLICAS));
Set<String> requiredProperties = new HashSet<>(Arrays.asList(KUDU_HASH_COLS));
if (!tableSchema.getPrimaryKey().isPresent()) {
requiredProperties.add(KUDU_PRIMARY_KEY_COLS);
}
if (!tableProperties.keySet().containsAll(requiredProperties)) {
throw new CatalogException("Missing required property. The following properties must be provided: " +
requiredProperties.toString());
}
Set<String> permittedProperties = Sets.union(requiredProperties, optionalProperties);
if (!permittedProperties.containsAll(tableProperties.keySet())) {
throw new CatalogException("Unpermitted properties were given. The following properties are allowed:" +
permittedProperties.toString());
}
String tableName = tablePath.getObjectName();
KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(tableName, tableSchema, tableProperties);
createTable(tableInfo, ignoreIfExists);
}