private List parserDebeziumStructure()

in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java [160:213]


    private List<String> parserDebeziumStructure(String dorisTable, String ddl, JsonNode record)
            throws JsonProcessingException {
        JsonNode tableChange = extractTableChange(record);
        if (Objects.isNull(tableChange) || Objects.isNull(ddl)) {
            LOG.warn(
                    "tableChange or ddl is empty, cannot do schema change. dorisTable={}, tableChange={}, ddl={}",
                    dorisTable,
                    tableChange,
                    ddl);
            return null;
        }

        JsonNode columns = tableChange.get("table").get("columns");
        if (Objects.isNull(sourceConnector)) {
            sourceConnector =
                    SourceConnector.valueOf(
                            record.get("source").get("connector").asText().toUpperCase());
        }
        if (!filledTables.contains(dorisTable)) {
            fillOriginSchema(dorisTable, columns);
            filledTables.add(dorisTable);
        }

        Map<String, FieldSchema> fieldSchemaMap = originFieldSchemaMap.get(dorisTable);
        // remove backtick
        ddl = ddl.replace("`", "");
        // rename ddl
        // It is better to use sql_parser mode for rename
        Matcher renameDdlMatcher = renameDDLPattern.matcher(ddl);
        if (renameDdlMatcher.find()) {
            String oldColumnName = renameDdlMatcher.group(2);
            String newColumnName = renameDdlMatcher.group(3);
            // Change operation
            if (oldColumnName == null) {
                oldColumnName = renameDdlMatcher.group(4);
                newColumnName = renameDdlMatcher.group(5);
            }
            return SchemaChangeHelper.generateRenameDDLSql(
                    dorisTable, oldColumnName, newColumnName, fieldSchemaMap);
        }
        // add/drop ddl
        Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
        for (JsonNode column : columns) {
            buildFieldSchema(updateFiledSchema, column);
        }
        SchemaChangeHelper.compareSchema(updateFiledSchema, fieldSchemaMap);
        // In order to avoid other source table column change operations other than add/drop/rename,
        // which may lead to the accidental deletion of the doris column.
        Matcher matcher = addDropDDLPattern.matcher(ddl);
        if (!matcher.find()) {
            return null;
        }
        return SchemaChangeHelper.generateDDLSql(dorisTable);
    }