in flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/serializer/jsondebezium/SQLParserSchemaChange.java [66:107]
public boolean schemaChange(JsonNode recordRoot) {
boolean status = false;
try {
if (!StringUtils.isNullOrWhitespaceOnly(sourceTableName) && !checkTable(recordRoot)) {
return false;
}
EventType eventType = extractEventType(recordRoot);
if (eventType == null) {
LOG.warn("Failed to parse eventType. recordRoot={}", recordRoot);
return false;
}
if (eventType.equals(EventType.CREATE)) {
String dorisTable = getCreateTableIdentifier(recordRoot);
TableSchema tableSchema = tryParseCreateTableStatement(recordRoot, dorisTable);
status = schemaChangeManager.createTable(tableSchema);
if (status) {
String cdcTbl = getCdcTableIdentifier(recordRoot);
String dorisTbl = getCreateTableIdentifier(recordRoot);
changeContext.getTableMapping().put(cdcTbl, dorisTbl);
this.tableMapping = changeContext.getTableMapping();
LOG.info(
"create table ddl status: {}, add tableMapping {},{}",
status,
cdcTbl,
dorisTbl);
}
} else if (eventType.equals(EventType.ALTER)) {
Tuple2<String, String> dorisTableTuple = getDorisTableTuple(recordRoot);
if (dorisTableTuple == null) {
LOG.warn("Failed to get doris table tuple. record={}", recordRoot);
return false;
}
List<String> ddlList = tryParseAlterDDLs(recordRoot);
status = executeAlterDDLs(ddlList, recordRoot, dorisTableTuple, status);
}
} catch (Exception ex) {
LOG.warn("schema change error : ", ex);
}
return status;
}