in rocketmq-hbase/rocketmq-hbase-sink/src/main/java/org/apache/rocketmq/hbase/sink/Replicator.java [140:180]
public boolean replicate(ReplicateContext context) {
final List<WAL.Entry> entries = context.getEntries();
final Map<String, List<WAL.Entry>> entriesByTable = entries.stream()
.filter(entry -> tables.contains(entry.getKey().getTablename().getNameAsString()))
.collect(groupingBy(entry -> entry.getKey().getTablename().getNameAsString()));
// replicate data to rocketmq
Transaction transaction = new Transaction(maxTransactionRows);
try {
for (Map.Entry<String, List<WAL.Entry>> entry : entriesByTable.entrySet()) {
final String tableName = entry.getKey();
final List<WAL.Entry> tableEntries = entry.getValue();
for (WAL.Entry tableEntry : tableEntries) {
List<Cell> cells = tableEntry.getEdit().getCells();
// group entries by the row key
Map<byte[], List<Cell>> columnsByRow = cells.stream().collect(groupingBy(CellUtil::cloneRow));
for (Map.Entry<byte[], List<Cell>> rowCols : columnsByRow.entrySet()) {
final byte[] row = rowCols.getKey();
final List<Cell> columns = rowCols.getValue();
if (!transaction.addRow(tableName, row, columns)) {
producer.push(transaction.toJson());
transaction = new Transaction(maxTransactionRows);
}
}
}
}
// replicate remaining transaction
producer.push(transaction.toJson());
} catch (Exception e) {
logger.error("Error while sending message to RocketMQ.", e);
return false;
}
return true;
}