in flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java [310:342]
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
checkNotNull(tablePath, "tablePath cannot be null");
checkNotNull(table, "table cannot be null");
if(!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
if(tableExists(tablePath)){
if(ignoreIfExists){
return;
}
throw new TableAlreadyExistException(getName(), tablePath);
}
Map<String, String> options = table.getOptions();
if (!IDENTIFIER.equals(options.get(CONNECTOR.key()))) {
return;
}
List<String> primaryKeys = getCreateDorisKeys(table.getSchema());
TableSchema schema = new TableSchema();
schema.setDatabase(tablePath.getDatabaseName());
schema.setTable(tablePath.getObjectName());
schema.setTableComment(table.getComment());
schema.setFields(getCreateDorisColumns(table.getSchema()));
schema.setKeys(primaryKeys);
schema.setModel(DataModel.UNIQUE);
schema.setDistributeKeys(primaryKeys);
schema.setProperties(getCreateTableProps(options));
dorisSystem.createTable(schema);
}