public DatanodeDeletedBlockTransactions getTransactions()

in hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java [384:496]


  public DatanodeDeletedBlockTransactions getTransactions(
      int blockDeletionLimit, Set<DatanodeDetails> dnList)
      throws IOException {
    lock.lock();
    try {
      // Here we can clean up the Datanode timeout command that no longer
      // reports heartbeats
      getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand(
          scmCommandTimeoutMs);
      DatanodeDeletedBlockTransactions transactions =
          new DatanodeDeletedBlockTransactions();
      try (TableIterator<Long,
          ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
               deletedBlockLogStateManager.getReadOnlyIterator()) {
        if (lastProcessedTransactionId != -1) {
          iter.seek(lastProcessedTransactionId);
          /*
           * We should start from (lastProcessedTransactionId + 1) transaction.
           * Now the iterator (iter.next call) is pointing at
           * lastProcessedTransactionId, read the current value to move
           * the cursor.
           */
          if (iter.hasNext()) {
            /*
             * There is a possibility that the lastProcessedTransactionId got
             * deleted from the table, in that case we have to set
             * lastProcessedTransactionId to next available transaction in the table.
             *
             * By doing this there is a chance that we will skip processing the new
             * lastProcessedTransactionId, that should be ok. We can get to it in the
             * next run.
             */
            lastProcessedTransactionId = iter.next().getKey();
          }

          // If we have reached the end, go to beginning.
          if (!iter.hasNext()) {
            iter.seekToFirst();
            lastProcessedTransactionId = -1;
          }
        }

        // Get the CmdStatus status of the aggregation, so that the current
        // status of the specified transaction can be found faster
        Map<UUID, Map<Long, CmdStatus>> commandStatus =
            getSCMDeletedBlockTransactionStatusManager()
                .getCommandStatusByTxId(dnList.stream().
                map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
        ArrayList<Long> txIDs = new ArrayList<>();
        metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
        Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = null;
        // Here takes block replica count as the threshold to avoid the case
        // that part of replicas committed the TXN and recorded in the
        // SCMDeletedBlockTransactionStatusManager, while they are counted
        // in the threshold.
        while (iter.hasNext() &&
            transactions.getBlocksDeleted() < blockDeletionLimit) {
          keyValue = iter.next();
          DeletedBlocksTransaction txn = keyValue.getValue();
          final ContainerID id = ContainerID.valueOf(txn.getContainerID());
          try {
            // HDDS-7126. When container is under replicated, it is possible
            // that container is deleted, but transactions are not deleted.
            if (containerManager.getContainer(id).isDeleted()) {
              LOG.warn("Container: {} was deleted for the " +
                  "transaction: {}.", id, txn);
              txIDs.add(txn.getTxID());
            } else if (txn.getCount() > -1 && txn.getCount() <= maxRetry
                && !containerManager.getContainer(id).isOpen()) {
              Set<ContainerReplica> replicas = containerManager
                  .getContainerReplicas(
                      ContainerID.valueOf(txn.getContainerID()));
              if (checkInadequateReplica(replicas, txn, dnList)) {
                metrics.incrSkippedTransaction();
                continue;
              }
              getTransaction(
                  txn, transactions, dnList, replicas, commandStatus);
            } else if (txn.getCount() >= maxRetry || containerManager.getContainer(id).isOpen()) {
              metrics.incrSkippedTransaction();
            }
          } catch (ContainerNotFoundException ex) {
            LOG.warn("Container: {} was not found for the transaction: {}.", id, txn);
            txIDs.add(txn.getTxID());
          }

          if (lastProcessedTransactionId == keyValue.getKey()) {
            // We have circled back to the last transaction.
            break;
          }

          if (!iter.hasNext() && lastProcessedTransactionId != -1) {
            /*
             * We started from in-between and reached end of the table,
             * now we should go to the start of the table and process
             * the transactions.
             */
            iter.seekToFirst();
          }
        }

        lastProcessedTransactionId = keyValue != null ? keyValue.getKey() : -1;

        if (!txIDs.isEmpty()) {
          deletedBlockLogStateManager.removeTransactionsFromDB(txIDs);
          metrics.incrBlockDeletionTransactionCompleted(txIDs.size());
        }
      }
      return transactions;
    } finally {
      lock.unlock();
    }
  }