in hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java [196:339]
public void replicateEntries(List<WALEntry> entries, final ExtendedCellScanner cells,
String replicationClusterId, String sourceBaseNamespaceDirPath,
String sourceHFileArchiveDirPath) throws IOException {
if (entries.isEmpty()) {
return;
}
// Very simple optimization where we batch sequences of rows going
// to the same table.
try {
long totalReplicated = 0;
// Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
// invocation of this method per table and cluster id.
Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs =
new Pair<>(new ArrayList<>(), new ArrayList<>());
for (WALEntry entry : entries) {
TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
if (this.walEntrySinkFilter != null) {
if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
// Skip Cells in CellScanner associated with this entry.
int count = entry.getAssociatedCellCount();
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
}
continue;
}
}
ExtendedCell previousCell = null;
Mutation mutation = null;
int count = entry.getAssociatedCellCount();
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
ExtendedCell cell = cells.current();
// Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
if (bld.getReplicate()) {
if (bulkLoadsPerClusters == null) {
bulkLoadsPerClusters = new HashMap<>();
}
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
} else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) {
Mutation put = processReplicationMarkerEntry(cell);
if (put == null) {
continue;
}
table = REPLICATION_SINK_TRACKER_TABLE_NAME;
List<UUID> clusterIds = new ArrayList<>();
for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
clusterIds.add(toUUID(clusterId));
}
put.setClusterIds(clusterIds);
addToHashMultiMap(rowMap, table, clusterIds, put);
} else {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {
// Create new mutation
mutation = CellUtil.isDelete(cell)
? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
: new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size());
for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
clusterIds.add(toUUID(clusterId));
}
mutation.setClusterIds(clusterIds);
mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME,
HConstants.EMPTY_BYTE_ARRAY);
if (rsServerHost != null) {
rsServerHost.preReplicationSinkBatchMutate(entry, mutation);
mutationsToWalEntriesPairs.getFirst().add(mutation);
mutationsToWalEntriesPairs.getSecond().add(entry);
}
addToHashMultiMap(rowMap, table, clusterIds, mutation);
}
if (CellUtil.isDelete(cell)) {
((Delete) mutation).add(cell);
} else {
((Put) mutation).add(cell);
}
previousCell = cell;
}
}
totalReplicated++;
}
// TODO Replicating mutations and bulk loaded data can be made parallel
if (!rowMap.isEmpty()) {
LOG.debug("Started replicating mutations.");
for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
}
LOG.debug("Finished replicating mutations.");
}
if (rsServerHost != null) {
List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst();
List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond();
for (int i = 0; i < mutations.size(); i++) {
rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i));
}
}
if (bulkLoadsPerClusters != null) {
for (Entry<List<String>,
Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString());
Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId);
try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf,
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
getConnection(), entry.getKey())) {
hFileReplicator.replicate();
LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString());
}
}
}
}
int size = entries.size();
this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
this.totalReplicatedEdits.addAndGet(totalReplicated);
} catch (IOException ex) {
LOG.error("Unable to accept edit because:", ex);
this.metrics.incrementFailedBatches();
throw ex;
}
}