public boolean replicate()

in rocketmq-hbase/rocketmq-hbase-sink/src/main/java/org/apache/rocketmq/hbase/sink/Replicator.java [140:180]


    public boolean replicate(ReplicateContext context) {
        final List<WAL.Entry> entries = context.getEntries();

        final Map<String, List<WAL.Entry>> entriesByTable = entries.stream()
                .filter(entry -> tables.contains(entry.getKey().getTablename().getNameAsString()))
                .collect(groupingBy(entry -> entry.getKey().getTablename().getNameAsString()));

        // replicate data to rocketmq
        Transaction transaction = new Transaction(maxTransactionRows);
        try {
            for (Map.Entry<String, List<WAL.Entry>> entry : entriesByTable.entrySet()) {
                final String tableName = entry.getKey();
                final List<WAL.Entry> tableEntries = entry.getValue();

                for (WAL.Entry tableEntry : tableEntries) {
                    List<Cell> cells = tableEntry.getEdit().getCells();

                    // group entries by the row key
                    Map<byte[], List<Cell>> columnsByRow = cells.stream().collect(groupingBy(CellUtil::cloneRow));

                    for (Map.Entry<byte[], List<Cell>> rowCols : columnsByRow.entrySet()) {
                        final byte[] row = rowCols.getKey();
                        final List<Cell> columns = rowCols.getValue();

                        if (!transaction.addRow(tableName, row, columns)) {
                            producer.push(transaction.toJson());
                            transaction = new Transaction(maxTransactionRows);
                        }
                    }
                }
            }

            // replicate remaining transaction
            producer.push(transaction.toJson());
        } catch (Exception e) {
            logger.error("Error while sending message to RocketMQ.", e);
            return false;
        }

        return true;
    }