in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java [128:182]
public TableSchema parseCreateTableStatement(
SourceConnector sourceConnector,
String ddl,
String dorisTable,
DorisTableConfig dorisTableConfig) {
try {
Statement statement = CCJSqlParserUtil.parse(ddl);
if (statement instanceof CreateTable) {
CreateTable createTable = (CreateTable) statement;
Map<String, FieldSchema> columnFields = new LinkedHashMap<>();
List<String> pkKeys = new ArrayList<>();
createTable
.getColumnDefinitions()
.forEach(
column -> {
String columnName = column.getColumnName();
List<String> columnSpecs = column.getColumnSpecs();
FieldSchema fieldSchema =
getFieldSchema(
column.getColumnName(),
column.getColumnSpecs(),
column.getColDataType(),
sourceConnector);
columnFields.put(columnName, fieldSchema);
extractColumnPrimaryKey(columnName, columnSpecs, pkKeys);
});
List<Index> indexes = createTable.getIndexes();
extractIndexesPrimaryKey(indexes, pkKeys);
String[] dbTable = dorisTable.split("\\.");
Preconditions.checkArgument(dbTable.length == 2);
return DorisSchemaFactory.createTableSchema(
dbTable[0],
dbTable[1],
columnFields,
pkKeys,
dorisTableConfig,
extractTableComment(createTable.getTableOptionsStrings()));
} else {
LOG.warn(
"Unsupported statement type. ddl={}, sourceConnector={}, dorisTable={}",
ddl,
sourceConnector.getConnectorName(),
dorisTable);
}
} catch (JSQLParserException e) {
LOG.warn(
"Failed to parse create table statement. ddl={}, sourceConnector={}, dorisTable={}",
ddl,
sourceConnector.getConnectorName(),
dorisTable);
}
return null;
}