in connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQMessageUtils.java [355:454]
public static List<FlatMessage> messageConverter(EntryRowData[] datas, long id) {
List<FlatMessage> flatMessages = new ArrayList<>();
for (EntryRowData entryRowData : datas) {
CanalEntry.Entry entry = entryRowData.entry;
CanalEntry.RowChange rowChange = entryRowData.rowChange;
// 如果有分区路由,则忽略begin/end事件
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
// build flatMessage
CanalEntry.EventType eventType = rowChange.getEventType();
FlatMessage flatMessage = new FlatMessage(id);
flatMessages.add(flatMessage);
flatMessage.setDatabase(entry.getHeader().getSchemaName());
flatMessage.setTable(entry.getHeader().getTableName());
flatMessage.setIsDdl(rowChange.getIsDdl());
flatMessage.setType(eventType.toString());
flatMessage.setEs(entry.getHeader().getExecuteTime());
flatMessage.setTs(System.currentTimeMillis());
flatMessage.setSql(rowChange.getSql());
flatMessage.setGtid(entry.getHeader().getGtid());
if (!rowChange.getIsDdl()) {
Map<String, Integer> sqlType = new LinkedHashMap<>();
Map<String, String> mysqlType = new LinkedHashMap<>();
List<Map<String, String>> data = new ArrayList<>();
List<Map<String, String>> old = new ArrayList<>();
Set<String> updateSet = new HashSet<>();
boolean hasInitPkNames = false;
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
&& eventType != CanalEntry.EventType.DELETE) {
continue;
}
Map<String, String> row = new LinkedHashMap<>();
List<CanalEntry.Column> columns;
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}
for (CanalEntry.Column column : columns) {
if (!hasInitPkNames && column.getIsKey()) {
flatMessage.addPkName(column.getName());
}
sqlType.put(column.getName(), column.getSqlType());
mysqlType.put(column.getName(), column.getMysqlType());
if (column.getIsNull()) {
row.put(column.getName(), null);
} else {
row.put(column.getName(), column.getValue());
}
// 获取update为true的字段
if (column.getUpdated()) {
updateSet.add(column.getName());
}
}
hasInitPkNames = true;
if (!row.isEmpty()) {
data.add(row);
}
if (eventType == CanalEntry.EventType.UPDATE) {
Map<String, String> rowOld = new LinkedHashMap<>();
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
if (updateSet.contains(column.getName())) {
if (column.getIsNull()) {
rowOld.put(column.getName(), null);
} else {
rowOld.put(column.getName(), column.getValue());
}
}
}
// update操作将记录修改前的值
old.add(rowOld);
}
}
if (!sqlType.isEmpty()) {
flatMessage.setSqlType(sqlType);
}
if (!mysqlType.isEmpty()) {
flatMessage.setMysqlType(mysqlType);
}
if (!data.isEmpty()) {
flatMessage.setData(data);
}
if (!old.isEmpty()) {
flatMessage.setOld(old);
}
}
}
return flatMessages;
}