public void replicateEntries()

in hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java [196:339]


  public void replicateEntries(List<WALEntry> entries, final ExtendedCellScanner cells,
    String replicationClusterId, String sourceBaseNamespaceDirPath,
    String sourceHFileArchiveDirPath) throws IOException {
    if (entries.isEmpty()) {
      return;
    }
    // Very simple optimization where we batch sequences of rows going
    // to the same table.
    try {
      long totalReplicated = 0;
      // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
      // invocation of this method per table and cluster id.
      Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();

      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
      Pair<List<Mutation>, List<WALEntry>> mutationsToWalEntriesPairs =
        new Pair<>(new ArrayList<>(), new ArrayList<>());
      for (WALEntry entry : entries) {
        TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
        if (this.walEntrySinkFilter != null) {
          if (this.walEntrySinkFilter.filter(table, entry.getKey().getWriteTime())) {
            // Skip Cells in CellScanner associated with this entry.
            int count = entry.getAssociatedCellCount();
            for (int i = 0; i < count; i++) {
              // Throw index out of bounds if our cell count is off
              if (!cells.advance()) {
                this.metrics.incrementFailedBatches();
                throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
              }
            }
            continue;
          }
        }
        ExtendedCell previousCell = null;
        Mutation mutation = null;
        int count = entry.getAssociatedCellCount();
        for (int i = 0; i < count; i++) {
          // Throw index out of bounds if our cell count is off
          if (!cells.advance()) {
            this.metrics.incrementFailedBatches();
            throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
          }
          ExtendedCell cell = cells.current();
          // Handle bulk load hfiles replication
          if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
            if (bld.getReplicate()) {
              if (bulkLoadsPerClusters == null) {
                bulkLoadsPerClusters = new HashMap<>();
              }
              // Map of table name Vs list of pair of family and list of
              // hfile paths from its namespace
              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
                bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
            }
          } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) {
            Mutation put = processReplicationMarkerEntry(cell);
            if (put == null) {
              continue;
            }
            table = REPLICATION_SINK_TRACKER_TABLE_NAME;
            List<UUID> clusterIds = new ArrayList<>();
            for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
              clusterIds.add(toUUID(clusterId));
            }
            put.setClusterIds(clusterIds);
            addToHashMultiMap(rowMap, table, clusterIds, put);
          } else {
            // Handle wal replication
            if (isNewRowOrType(previousCell, cell)) {
              // Create new mutation
              mutation = CellUtil.isDelete(cell)
                ? new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
                : new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
              List<UUID> clusterIds = new ArrayList<>(entry.getKey().getClusterIdsList().size());
              for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
                clusterIds.add(toUUID(clusterId));
              }
              mutation.setClusterIds(clusterIds);
              mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME,
                HConstants.EMPTY_BYTE_ARRAY);
              if (rsServerHost != null) {
                rsServerHost.preReplicationSinkBatchMutate(entry, mutation);
                mutationsToWalEntriesPairs.getFirst().add(mutation);
                mutationsToWalEntriesPairs.getSecond().add(entry);
              }
              addToHashMultiMap(rowMap, table, clusterIds, mutation);
            }
            if (CellUtil.isDelete(cell)) {
              ((Delete) mutation).add(cell);
            } else {
              ((Put) mutation).add(cell);
            }
            previousCell = cell;
          }
        }
        totalReplicated++;
      }

      // TODO Replicating mutations and bulk loaded data can be made parallel
      if (!rowMap.isEmpty()) {
        LOG.debug("Started replicating mutations.");
        for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
          batch(entry.getKey(), entry.getValue().values(), rowSizeWarnThreshold);
        }
        LOG.debug("Finished replicating mutations.");
      }

      if (rsServerHost != null) {
        List<Mutation> mutations = mutationsToWalEntriesPairs.getFirst();
        List<WALEntry> walEntries = mutationsToWalEntriesPairs.getSecond();
        for (int i = 0; i < mutations.size(); i++) {
          rsServerHost.postReplicationSinkBatchMutate(walEntries.get(i), mutations.get(i));
        }
      }

      if (bulkLoadsPerClusters != null) {
        for (Entry<List<String>,
          Map<String, List<Pair<byte[], List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
            LOG.debug("Replicating {} bulk loaded data", entry.getKey().toString());
            Configuration providerConf = this.provider.getConf(this.conf, replicationClusterId);
            try (HFileReplicator hFileReplicator = new HFileReplicator(providerConf,
              sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
              getConnection(), entry.getKey())) {
              hFileReplicator.replicate();
              LOG.debug("Finished replicating {} bulk loaded data", entry.getKey().toString());
            }
          }
        }
      }

      int size = entries.size();
      this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
      this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
      this.totalReplicatedEdits.addAndGet(totalReplicated);
    } catch (IOException ex) {
      LOG.error("Unable to accept edit because:", ex);
      this.metrics.incrementFailedBatches();
      throw ex;
    }
  }