public boolean schemaChange()

in flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/mongodb/serializer/MongoJsonDebeziumSchemaChange.java [102:157]


    public boolean schemaChange(JsonNode recordRoot) {
        try {
            JsonNode logData = getFullDocument(recordRoot);
            String cdcTableIdentifier = getCdcTableIdentifier(recordRoot);
            String dorisTableIdentifier =
                    getDorisTableIdentifier(cdcTableIdentifier, dorisOptions, tableMapping);

            // if table dorisTableIdentifier is null, create table
            if (StringUtils.isNullOrWhitespaceOnly(dorisTableIdentifier)) {
                String[] split = cdcTableIdentifier.split("\\.");
                String targetDb = changeContext.getTargetDatabase();
                String sourceTable = split[1];
                String dorisTable = changeContext.getTableNameConverter().convert(sourceTable);
                LOG.info(
                        "The table [{}.{}] does not exist. Attempting to create a new table named: {}.{}",
                        targetDb,
                        sourceTable,
                        targetDb,
                        dorisTable);
                tableMapping.put(cdcTableIdentifier, String.format("%s.%s", targetDb, dorisTable));
                dorisTableIdentifier = tableMapping.get(cdcTableIdentifier);
                Map<String, Object> stringObjectMap = extractAfterRow(logData);
                JsonNode jsonNode = objectMapper.valueToTree(stringObjectMap);

                MongoDBSchema mongoSchema = new MongoDBSchema(jsonNode, targetDb, dorisTable, "");

                mongoSchema.setModel(DataModel.UNIQUE);
                DorisTableUtil.tryCreateTableIfAbsent(
                        dorisSystem,
                        targetDb,
                        dorisTable,
                        mongoSchema,
                        changeContext.getDorisTableConf());
            }

            String[] tableInfo = dorisTableIdentifier.split("\\.");
            if (tableInfo.length != 2) {
                throw new DorisRuntimeException();
            }
            String dataBase = tableInfo[0];
            String table = tableInfo[1];
            // build table fields mapping for all record
            buildDorisTableFieldsMapping(dataBase, table);

            // Determine whether change stream log and tableField are exactly the same, if not,
            // perform
            // schema change
            checkAndUpdateSchemaChange(logData, dorisTableIdentifier, dataBase, table);
            formatSpecialFieldData(logData);
            ((ObjectNode) recordRoot).set(FIELD_DATA, logData);
            return true;
        } catch (Exception ex) {
            LOG.warn("schema change error : ", ex);
            return false;
        }
    }