in pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java [66:159]
public static List<FlatMessage> messageConverter(Message message) {
try {
if (message == null) {
return null;
}
List<FlatMessage> flatMessages = new ArrayList<>();
List<CanalEntry.Entry> entrys = null;
if (message.isRaw()) {
List<ByteString> rawEntries = message.getRawEntries();
entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
for (ByteString byteString : rawEntries) {
CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
entrys.add(entry);
}
} else {
entrys = message.getEntries();
}
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
+ entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
FlatMessage flatMessage = new FlatMessage(message.getId());
flatMessages.add(flatMessage);
flatMessage.setDatabase(entry.getHeader().getSchemaName());
flatMessage.setTable(entry.getHeader().getTableName());
flatMessage.setIsDdl(rowChange.getIsDdl());
flatMessage.setType(eventType.toString());
flatMessage.setEs(entry.getHeader().getExecuteTime());
flatMessage.setTs(System.currentTimeMillis());
flatMessage.setSql(rowChange.getSql());
if (!rowChange.getIsDdl()) {
List<Map<String, String>> data = new ArrayList<>();
List<Map<String, String>> old = new ArrayList<>();
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
&& eventType != CanalEntry.EventType.DELETE) {
continue;
}
List<CanalEntry.Column> columns;
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}
columns.size();
for (CanalEntry.Column column : columns) {
Map<String, String> row = genColumn(column);
if (column.getUpdated()) {
row.put("updated", "1");
} else {
row.put("updated", "0");
}
data.add(row);
}
if (eventType == CanalEntry.EventType.UPDATE) {
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
Map<String, String> rowOld = genColumn(column);
old.add(rowOld);
}
}
}
if (!data.isEmpty()) {
flatMessage.setData(data);
}
if (!old.isEmpty()) {
flatMessage.setOld(old);
}
}
}
return flatMessages;
} catch (Exception e) {
throw new RuntimeException(e);
}
}