private static CanalConnectRecord internParse()

in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java [200:300]


    private static CanalConnectRecord internParse(CanalSourceIncrementConfig canalSourceConfig, Entry entry,
        RowChange rowChange, RowData rowData) {
        CanalConnectRecord canalConnectRecord = new CanalConnectRecord();
        canalConnectRecord.setTableName(entry.getHeader().getTableName());
        canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName());
        canalConnectRecord.setEventType(EventType.valueOf(rowChange.getEventType().name()));
        canalConnectRecord.setExecuteTime(entry.getHeader().getExecuteTime());
        canalConnectRecord.setJournalName(entry.getHeader().getLogfileName());
        canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset());
        // if enabled gtid mode, gtid not null
        if (canalSourceConfig.isGTIDMode()) {
            if (canalSourceConfig.isMariaDB()) {
                String currentGtid = entry.getHeader().getGtid();
                canalConnectRecord.setGtid(currentGtid);
                canalConnectRecord.setCurrentGtid(currentGtid);
            } else {
                String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
                String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID());
                canalConnectRecord.setGtid(gtidRange);
                canalConnectRecord.setCurrentGtid(currentGtid);
            }
        }

        EventType eventType = canalConnectRecord.getEventType();

        List<Column> beforeColumns = rowData.getBeforeColumnsList();
        List<Column> afterColumns = rowData.getAfterColumnsList();

        boolean isRowMode = canalSourceConfig.getSyncMode().isRow();

        Map<String, EventColumn> keyColumns = new LinkedHashMap<>();
        Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<>();
        Map<String, EventColumn> notKeyColumns = new LinkedHashMap<>();

        if (eventType.isInsert()) {
            for (Column column : afterColumns) {
                if (column.getIsKey()) {
                    keyColumns.put(column.getName(), copyEventColumn(column, true));
                } else {
                    notKeyColumns.put(column.getName(), copyEventColumn(column, true));
                }
            }
        } else if (eventType.isDelete()) {
            for (Column column : beforeColumns) {
                if (column.getIsKey()) {
                    keyColumns.put(column.getName(), copyEventColumn(column, true));
                } else {
                    notKeyColumns.put(column.getName(), copyEventColumn(column, true));
                }
            }
        } else if (eventType.isUpdate()) {
            for (Column column : beforeColumns) {
                if (column.getIsKey()) {
                    oldKeyColumns.put(column.getName(), copyEventColumn(column, true));
                    keyColumns.put(column.getName(), copyEventColumn(column, true));
                } else {
                    if (isRowMode && entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE) {
                        notKeyColumns.put(column.getName(), copyEventColumn(column, true));
                    }
                }
            }
            for (Column column : afterColumns) {
                if (column.getIsKey()) {
                    keyColumns.put(column.getName(), copyEventColumn(column, true));
                } else if (isRowMode || entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE
                    || column.getUpdated()) {

                    boolean isUpdate = true;
                    if (entry.getHeader().getSourceType() == CanalEntry.Type.MYSQL) {
                        isUpdate = column.getUpdated();
                    }

                    notKeyColumns.put(column.getName(), copyEventColumn(column, isUpdate));
                }
            }

            if (entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE) {
                checkUpdateKeyColumns(oldKeyColumns, keyColumns);
            }
        }

        List<EventColumn> keys = new ArrayList<>(keyColumns.values());
        List<EventColumn> oldKeys = new ArrayList<>(oldKeyColumns.values());
        List<EventColumn> columns = new ArrayList<>(notKeyColumns.values());

        keys.sort(new EventColumnIndexComparable());
        oldKeys.sort(new EventColumnIndexComparable());
        columns.sort(new EventColumnIndexComparable());
        if (!keyColumns.isEmpty()) {
            canalConnectRecord.setKeys(keys);
            if (canalConnectRecord.getEventType().isUpdate() && !oldKeys.equals(keys)) {
                canalConnectRecord.setOldKeys(oldKeys);
            }
            canalConnectRecord.setColumns(columns);
        } else {
            throw new RuntimeException("this row data has no pks , entry: " + entry + " and rowData: "
                + rowData);
        }

        return canalConnectRecord;
    }