public TableSchema parseCreateTableStatement()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SQLParserSchemaManager.java [128:182]


    public TableSchema parseCreateTableStatement(
            SourceConnector sourceConnector,
            String ddl,
            String dorisTable,
            DorisTableConfig dorisTableConfig) {
        try {
            Statement statement = CCJSqlParserUtil.parse(ddl);
            if (statement instanceof CreateTable) {
                CreateTable createTable = (CreateTable) statement;
                Map<String, FieldSchema> columnFields = new LinkedHashMap<>();
                List<String> pkKeys = new ArrayList<>();
                createTable
                        .getColumnDefinitions()
                        .forEach(
                                column -> {
                                    String columnName = column.getColumnName();
                                    List<String> columnSpecs = column.getColumnSpecs();
                                    FieldSchema fieldSchema =
                                            getFieldSchema(
                                                    column.getColumnName(),
                                                    column.getColumnSpecs(),
                                                    column.getColDataType(),
                                                    sourceConnector);
                                    columnFields.put(columnName, fieldSchema);
                                    extractColumnPrimaryKey(columnName, columnSpecs, pkKeys);
                                });

                List<Index> indexes = createTable.getIndexes();
                extractIndexesPrimaryKey(indexes, pkKeys);
                String[] dbTable = dorisTable.split("\\.");
                Preconditions.checkArgument(dbTable.length == 2);

                return DorisSchemaFactory.createTableSchema(
                        dbTable[0],
                        dbTable[1],
                        columnFields,
                        pkKeys,
                        dorisTableConfig,
                        extractTableComment(createTable.getTableOptionsStrings()));
            } else {
                LOG.warn(
                        "Unsupported statement type. ddl={}, sourceConnector={}, dorisTable={}",
                        ddl,
                        sourceConnector.getConnectorName(),
                        dorisTable);
            }
        } catch (JSQLParserException e) {
            LOG.warn(
                    "Failed to parse create table statement. ddl={}, sourceConnector={}, dorisTable={}",
                    ddl,
                    sourceConnector.getConnectorName(),
                    dorisTable);
        }
        return null;
    }