in hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java [5522:5594]
public void cleanReplicationBarrier() throws IOException {
if (!cleanReplicationBarrier || cleanReplicationBarrierTable == null) {
return;
}
if (cleanReplicationBarrierTable.isSystemTable()) {
errors.reportError(ErrorReporter.ERROR_CODE.INVALID_TABLE,
"invalid table: " + cleanReplicationBarrierTable);
return;
}
boolean isGlobalScope = false;
try {
isGlobalScope =
admin.getDescriptor(cleanReplicationBarrierTable).hasGlobalReplicationScope();
} catch (TableNotFoundException e) {
LOG.info("we may need to clean some erroneous data due to bugs");
}
if (isGlobalScope) {
errors.reportError(ErrorReporter.ERROR_CODE.INVALID_TABLE,
"table's replication scope is global: " + cleanReplicationBarrierTable);
return;
}
List<byte[]> regionNames = new ArrayList<>();
Scan barrierScan = new Scan();
barrierScan.setCaching(100);
barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
barrierScan
.withStartRow(HBCKMetaTableAccessor.getTableStartRowForMeta(cleanReplicationBarrierTable))
.withStopRow(HBCKMetaTableAccessor.getTableStopRowForMeta(cleanReplicationBarrierTable));
Result result;
try (ResultScanner scanner = meta.getScanner(barrierScan)) {
while ((result = scanner.next()) != null) {
regionNames.add(result.getRow());
}
}
if (regionNames.size() <= 0) {
errors.reportError(ErrorReporter.ERROR_CODE.INVALID_TABLE,
"No replication barrier(s) on table: " + cleanReplicationBarrierTable);
return;
}
ReplicationQueueStorage queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
List<ReplicationPeerDescription> peerDescriptions = admin.listReplicationPeers();
if (peerDescriptions != null && peerDescriptions.size() > 0) {
List<String> peers = peerDescriptions.stream()
.filter(peerConfig -> peerConfig.getPeerConfig()
.needToReplicate(cleanReplicationBarrierTable))
.map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
try {
List<String> batch = new ArrayList<>();
for (String peer : peers) {
for (byte[] regionName : regionNames) {
batch.add(RegionInfo.encodeRegionName(regionName));
if (batch.size() % 100 == 0) {
queueStorage.removeLastSequenceIds(peer, batch);
batch.clear();
}
}
if (batch.size() > 0) {
queueStorage.removeLastSequenceIds(peer, batch);
batch.clear();
}
}
} catch (ReplicationException re) {
throw new IOException(re);
}
}
for (byte[] regionName : regionNames) {
meta.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
}
setShouldRerun();
}