in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/JsonDebeziumSchemaChangeImplV2.java [160:213]
private List<String> parserDebeziumStructure(String dorisTable, String ddl, JsonNode record)
throws JsonProcessingException {
JsonNode tableChange = extractTableChange(record);
if (Objects.isNull(tableChange) || Objects.isNull(ddl)) {
LOG.warn(
"tableChange or ddl is empty, cannot do schema change. dorisTable={}, tableChange={}, ddl={}",
dorisTable,
tableChange,
ddl);
return null;
}
JsonNode columns = tableChange.get("table").get("columns");
if (Objects.isNull(sourceConnector)) {
sourceConnector =
SourceConnector.valueOf(
record.get("source").get("connector").asText().toUpperCase());
}
if (!filledTables.contains(dorisTable)) {
fillOriginSchema(dorisTable, columns);
filledTables.add(dorisTable);
}
Map<String, FieldSchema> fieldSchemaMap = originFieldSchemaMap.get(dorisTable);
// remove backtick
ddl = ddl.replace("`", "");
// rename ddl
// It is better to use sql_parser mode for rename
Matcher renameDdlMatcher = renameDDLPattern.matcher(ddl);
if (renameDdlMatcher.find()) {
String oldColumnName = renameDdlMatcher.group(2);
String newColumnName = renameDdlMatcher.group(3);
// Change operation
if (oldColumnName == null) {
oldColumnName = renameDdlMatcher.group(4);
newColumnName = renameDdlMatcher.group(5);
}
return SchemaChangeHelper.generateRenameDDLSql(
dorisTable, oldColumnName, newColumnName, fieldSchemaMap);
}
// add/drop ddl
Map<String, FieldSchema> updateFiledSchema = new LinkedHashMap<>();
for (JsonNode column : columns) {
buildFieldSchema(updateFiledSchema, column);
}
SchemaChangeHelper.compareSchema(updateFiledSchema, fieldSchemaMap);
// In order to avoid other source table column change operations other than add/drop/rename,
// which may lead to the accidental deletion of the doris column.
Matcher matcher = addDropDDLPattern.matcher(ddl);
if (!matcher.find()) {
return null;
}
return SchemaChangeHelper.generateDDLSql(dorisTable);
}