public void cleanReplicationBarrier()

in hbase-hbck2/src/main/java/org/apache/hbase/hbck1/HBaseFsck.java [5522:5594]


  public void cleanReplicationBarrier() throws IOException {
    if (!cleanReplicationBarrier || cleanReplicationBarrierTable == null) {
      return;
    }
    if (cleanReplicationBarrierTable.isSystemTable()) {
      errors.reportError(ErrorReporter.ERROR_CODE.INVALID_TABLE,
        "invalid table: " + cleanReplicationBarrierTable);
      return;
    }

    boolean isGlobalScope = false;
    try {
      isGlobalScope =
          admin.getDescriptor(cleanReplicationBarrierTable).hasGlobalReplicationScope();
    } catch (TableNotFoundException e) {
      LOG.info("we may need to clean some erroneous data due to bugs");
    }

    if (isGlobalScope) {
      errors.reportError(ErrorReporter.ERROR_CODE.INVALID_TABLE,
        "table's replication scope is global: " + cleanReplicationBarrierTable);
      return;
    }
    List<byte[]> regionNames = new ArrayList<>();
    Scan barrierScan = new Scan();
    barrierScan.setCaching(100);
    barrierScan.addFamily(HConstants.REPLICATION_BARRIER_FAMILY);
    barrierScan
        .withStartRow(HBCKMetaTableAccessor.getTableStartRowForMeta(cleanReplicationBarrierTable))
        .withStopRow(HBCKMetaTableAccessor.getTableStopRowForMeta(cleanReplicationBarrierTable));
    Result result;
    try (ResultScanner scanner = meta.getScanner(barrierScan)) {
      while ((result = scanner.next()) != null) {
        regionNames.add(result.getRow());
      }
    }
    if (regionNames.size() <= 0) {
      errors.reportError(ErrorReporter.ERROR_CODE.INVALID_TABLE,
        "No replication barrier(s) on table: " + cleanReplicationBarrierTable);
      return;
    }
    ReplicationQueueStorage queueStorage =
        ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
    List<ReplicationPeerDescription> peerDescriptions = admin.listReplicationPeers();
    if (peerDescriptions != null && peerDescriptions.size() > 0) {
      List<String> peers = peerDescriptions.stream()
          .filter(peerConfig -> peerConfig.getPeerConfig()
                  .needToReplicate(cleanReplicationBarrierTable))
          .map(ReplicationPeerDescription::getPeerId).collect(Collectors.toList());
      try {
        List<String> batch = new ArrayList<>();
        for (String peer : peers) {
          for (byte[] regionName : regionNames) {
            batch.add(RegionInfo.encodeRegionName(regionName));
            if (batch.size() % 100 == 0) {
              queueStorage.removeLastSequenceIds(peer, batch);
              batch.clear();
            }
          }
          if (batch.size() > 0) {
            queueStorage.removeLastSequenceIds(peer, batch);
            batch.clear();
          }
        }
      } catch (ReplicationException re) {
        throw new IOException(re);
      }
    }
    for (byte[] regionName : regionNames) {
      meta.delete(new Delete(regionName).addFamily(HConstants.REPLICATION_BARRIER_FAMILY));
    }
    setShouldRerun();
  }