in eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java [200:300]
private static CanalConnectRecord internParse(CanalSourceIncrementConfig canalSourceConfig, Entry entry,
RowChange rowChange, RowData rowData) {
CanalConnectRecord canalConnectRecord = new CanalConnectRecord();
canalConnectRecord.setTableName(entry.getHeader().getTableName());
canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName());
canalConnectRecord.setEventType(EventType.valueOf(rowChange.getEventType().name()));
canalConnectRecord.setExecuteTime(entry.getHeader().getExecuteTime());
canalConnectRecord.setJournalName(entry.getHeader().getLogfileName());
canalConnectRecord.setBinLogOffset(entry.getHeader().getLogfileOffset());
// if enabled gtid mode, gtid not null
if (canalSourceConfig.isGTIDMode()) {
if (canalSourceConfig.isMariaDB()) {
String currentGtid = entry.getHeader().getGtid();
canalConnectRecord.setGtid(currentGtid);
canalConnectRecord.setCurrentGtid(currentGtid);
} else {
String currentGtid = entry.getHeader().getPropsList().get(0).getValue();
String gtidRange = replaceGtidRange(entry.getHeader().getGtid(), currentGtid, canalSourceConfig.getServerUUID());
canalConnectRecord.setGtid(gtidRange);
canalConnectRecord.setCurrentGtid(currentGtid);
}
}
EventType eventType = canalConnectRecord.getEventType();
List<Column> beforeColumns = rowData.getBeforeColumnsList();
List<Column> afterColumns = rowData.getAfterColumnsList();
boolean isRowMode = canalSourceConfig.getSyncMode().isRow();
Map<String, EventColumn> keyColumns = new LinkedHashMap<>();
Map<String, EventColumn> oldKeyColumns = new LinkedHashMap<>();
Map<String, EventColumn> notKeyColumns = new LinkedHashMap<>();
if (eventType.isInsert()) {
for (Column column : afterColumns) {
if (column.getIsKey()) {
keyColumns.put(column.getName(), copyEventColumn(column, true));
} else {
notKeyColumns.put(column.getName(), copyEventColumn(column, true));
}
}
} else if (eventType.isDelete()) {
for (Column column : beforeColumns) {
if (column.getIsKey()) {
keyColumns.put(column.getName(), copyEventColumn(column, true));
} else {
notKeyColumns.put(column.getName(), copyEventColumn(column, true));
}
}
} else if (eventType.isUpdate()) {
for (Column column : beforeColumns) {
if (column.getIsKey()) {
oldKeyColumns.put(column.getName(), copyEventColumn(column, true));
keyColumns.put(column.getName(), copyEventColumn(column, true));
} else {
if (isRowMode && entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE) {
notKeyColumns.put(column.getName(), copyEventColumn(column, true));
}
}
}
for (Column column : afterColumns) {
if (column.getIsKey()) {
keyColumns.put(column.getName(), copyEventColumn(column, true));
} else if (isRowMode || entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE
|| column.getUpdated()) {
boolean isUpdate = true;
if (entry.getHeader().getSourceType() == CanalEntry.Type.MYSQL) {
isUpdate = column.getUpdated();
}
notKeyColumns.put(column.getName(), copyEventColumn(column, isUpdate));
}
}
if (entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE) {
checkUpdateKeyColumns(oldKeyColumns, keyColumns);
}
}
List<EventColumn> keys = new ArrayList<>(keyColumns.values());
List<EventColumn> oldKeys = new ArrayList<>(oldKeyColumns.values());
List<EventColumn> columns = new ArrayList<>(notKeyColumns.values());
keys.sort(new EventColumnIndexComparable());
oldKeys.sort(new EventColumnIndexComparable());
columns.sort(new EventColumnIndexComparable());
if (!keyColumns.isEmpty()) {
canalConnectRecord.setKeys(keys);
if (canalConnectRecord.getEventType().isUpdate() && !oldKeys.equals(keys)) {
canalConnectRecord.setOldKeys(oldKeys);
}
canalConnectRecord.setColumns(columns);
} else {
throw new RuntimeException("this row data has no pks , entry: " + entry + " and rowData: "
+ rowData);
}
return canalConnectRecord;
}