public static List messageConverter()

in connector/core/src/main/java/com/alibaba/otter/canal/connector/core/producer/MQMessageUtils.java [355:454]


    public static List<FlatMessage> messageConverter(EntryRowData[] datas, long id) {
        List<FlatMessage> flatMessages = new ArrayList<>();
        for (EntryRowData entryRowData : datas) {
            CanalEntry.Entry entry = entryRowData.entry;
            CanalEntry.RowChange rowChange = entryRowData.rowChange;
            // 如果有分区路由,则忽略begin/end事件
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            // build flatMessage
            CanalEntry.EventType eventType = rowChange.getEventType();
            FlatMessage flatMessage = new FlatMessage(id);
            flatMessages.add(flatMessage);
            flatMessage.setDatabase(entry.getHeader().getSchemaName());
            flatMessage.setTable(entry.getHeader().getTableName());
            flatMessage.setIsDdl(rowChange.getIsDdl());
            flatMessage.setType(eventType.toString());
            flatMessage.setEs(entry.getHeader().getExecuteTime());
            flatMessage.setTs(System.currentTimeMillis());
            flatMessage.setSql(rowChange.getSql());
            flatMessage.setGtid(entry.getHeader().getGtid());

            if (!rowChange.getIsDdl()) {
                Map<String, Integer> sqlType = new LinkedHashMap<>();
                Map<String, String> mysqlType = new LinkedHashMap<>();
                List<Map<String, String>> data = new ArrayList<>();
                List<Map<String, String>> old = new ArrayList<>();

                Set<String> updateSet = new HashSet<>();
                boolean hasInitPkNames = false;
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
                        && eventType != CanalEntry.EventType.DELETE) {
                        continue;
                    }

                    Map<String, String> row = new LinkedHashMap<>();
                    List<CanalEntry.Column> columns;

                    if (eventType == CanalEntry.EventType.DELETE) {
                        columns = rowData.getBeforeColumnsList();
                    } else {
                        columns = rowData.getAfterColumnsList();
                    }

                    for (CanalEntry.Column column : columns) {
                        if (!hasInitPkNames && column.getIsKey()) {
                            flatMessage.addPkName(column.getName());
                        }
                        sqlType.put(column.getName(), column.getSqlType());
                        mysqlType.put(column.getName(), column.getMysqlType());
                        if (column.getIsNull()) {
                            row.put(column.getName(), null);
                        } else {
                            row.put(column.getName(), column.getValue());
                        }
                        // 获取update为true的字段
                        if (column.getUpdated()) {
                            updateSet.add(column.getName());
                        }
                    }

                    hasInitPkNames = true;
                    if (!row.isEmpty()) {
                        data.add(row);
                    }

                    if (eventType == CanalEntry.EventType.UPDATE) {
                        Map<String, String> rowOld = new LinkedHashMap<>();
                        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                            if (updateSet.contains(column.getName())) {
                                if (column.getIsNull()) {
                                    rowOld.put(column.getName(), null);
                                } else {
                                    rowOld.put(column.getName(), column.getValue());
                                }
                            }
                        }
                        // update操作将记录修改前的值
                        old.add(rowOld);
                    }
                }
                if (!sqlType.isEmpty()) {
                    flatMessage.setSqlType(sqlType);
                }
                if (!mysqlType.isEmpty()) {
                    flatMessage.setMysqlType(mysqlType);
                }
                if (!data.isEmpty()) {
                    flatMessage.setData(data);
                }
                if (!old.isEmpty()) {
                    flatMessage.setOld(old);
                }
            }
        }
        return flatMessages;
    }