in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java [102:157]
public boolean schemaChange(JsonNode recordRoot) {
try {
JsonNode logData = getFullDocument(recordRoot);
String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
String dorisTableIdentifier =
getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping);
// if table dorisTableIdentifier is null, create table
if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
String[] split = cdcTableIdentifier.split("\\.");
String targetDb = changeContext.getTargetDatabase();
String sourceTable = split[1];
String dorisTable = changeContext.getTableNameConverter().convert(sourceTable);
LOG.info(
"The table [{}.{}] does not exist. Attempting to create a new table named: {}.{}",
targetDb,
sourceTable,
targetDb,
dorisTable);
tableMapping.put(cdcTableIdentifier, String.format("%s.%s", targetDb, dorisTable));
dorisTableIdentifier = tableMapping.get(cdcTableIdentifier);
Map<String, Object> stringObjectMap = extractAfterRow(logData);
JsonNode jsonNode = objectMapper.valueToTree(stringObjectMap);
MongoDBSchema mongoSchema = new MongoDBSchema(jsonNode, targetDb, dorisTable, "");
mongoSchema.setModel(DataModel.UNIQUE);
DorisTableUtil.tryCreateTableIfAbsent(
dorisSystem,
targetDb,
dorisTable,
mongoSchema,
changeContext.getDorisTableConf());
}
String[] tableInfo = dorisTableIdentifier.split("\\.");
if (tableInfo.length != 2) {
throw new DorisRuntimeException();
}
String dataBase = tableInfo[0];
String table = tableInfo[1];
// build table fields mapping for all record
buildDorisTableFieldsMapping(dataBase, table);
// Determine whether change stream log and tableField are exactly the same, if not,
// perform
// schema change
checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, table);
formatSpecialFieldData(logData);
((ObjectNode) recordRoot).set(FIELD_DATA, logData);
return true;
} catch (Exception ex) {
LOG.warn("schema change error : ", ex);
return false;
}
}