public static List messageConverter()

in pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/MessageUtils.java [66:159]


    public static List<FlatMessage> messageConverter(Message message) {
        try {
            if (message == null) {
                return null;
            }

            List<FlatMessage> flatMessages = new ArrayList<>();
            List<CanalEntry.Entry> entrys = null;
            if (message.isRaw()) {
                List<ByteString> rawEntries = message.getRawEntries();
                entrys = new ArrayList<CanalEntry.Entry>(rawEntries.size());
                for (ByteString byteString : rawEntries) {
                    CanalEntry.Entry entry = CanalEntry.Entry.parseFrom(byteString);
                    entrys.add(entry);
                }
            } else {
                entrys = message.getEntries();
            }

            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                        || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }

                CanalEntry.RowChange rowChange;
                try {
                    rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:"
                            + entry.toString(), e);
                }

                CanalEntry.EventType eventType = rowChange.getEventType();

                FlatMessage flatMessage = new FlatMessage(message.getId());
                flatMessages.add(flatMessage);
                flatMessage.setDatabase(entry.getHeader().getSchemaName());
                flatMessage.setTable(entry.getHeader().getTableName());
                flatMessage.setIsDdl(rowChange.getIsDdl());
                flatMessage.setType(eventType.toString());
                flatMessage.setEs(entry.getHeader().getExecuteTime());
                flatMessage.setTs(System.currentTimeMillis());
                flatMessage.setSql(rowChange.getSql());

                if (!rowChange.getIsDdl()) {
                    List<Map<String, String>> data = new ArrayList<>();
                    List<Map<String, String>> old = new ArrayList<>();

                    for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                        if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
                                && eventType != CanalEntry.EventType.DELETE) {
                            continue;
                        }

                        List<CanalEntry.Column> columns;

                        if (eventType == CanalEntry.EventType.DELETE) {
                            columns = rowData.getBeforeColumnsList();
                        } else {
                            columns = rowData.getAfterColumnsList();
                        }
                        columns.size();
                        for (CanalEntry.Column column : columns) {
                            Map<String, String> row = genColumn(column);
                            if (column.getUpdated()) {
                                row.put("updated", "1");
                            } else {
                                row.put("updated", "0");
                            }
                            data.add(row);
                        }

                        if (eventType == CanalEntry.EventType.UPDATE) {
                            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                                Map<String, String> rowOld = genColumn(column);
                                old.add(rowOld);
                            }
                        }
                    }

                    if (!data.isEmpty()) {
                        flatMessage.setData(data);
                    }
                    if (!old.isEmpty()) {
                        flatMessage.setOld(old);
                    }
                }
            }
            return flatMessages;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }