in hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java [384:496]
public DatanodeDeletedBlockTransactions getTransactions(
int blockDeletionLimit, Set<DatanodeDetails> dnList)
throws IOException {
lock.lock();
try {
// Here we can clean up the Datanode timeout command that no longer
// reports heartbeats
getSCMDeletedBlockTransactionStatusManager().cleanAllTimeoutSCMCommand(
scmCommandTimeoutMs);
DatanodeDeletedBlockTransactions transactions =
new DatanodeDeletedBlockTransactions();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
deletedBlockLogStateManager.getReadOnlyIterator()) {
if (lastProcessedTransactionId != -1) {
iter.seek(lastProcessedTransactionId);
/*
* We should start from (lastProcessedTransactionId + 1) transaction.
* Now the iterator (iter.next call) is pointing at
* lastProcessedTransactionId, read the current value to move
* the cursor.
*/
if (iter.hasNext()) {
/*
* There is a possibility that the lastProcessedTransactionId got
* deleted from the table, in that case we have to set
* lastProcessedTransactionId to next available transaction in the table.
*
* By doing this there is a chance that we will skip processing the new
* lastProcessedTransactionId, that should be ok. We can get to it in the
* next run.
*/
lastProcessedTransactionId = iter.next().getKey();
}
// If we have reached the end, go to beginning.
if (!iter.hasNext()) {
iter.seekToFirst();
lastProcessedTransactionId = -1;
}
}
// Get the CmdStatus status of the aggregation, so that the current
// status of the specified transaction can be found faster
Map<UUID, Map<Long, CmdStatus>> commandStatus =
getSCMDeletedBlockTransactionStatusManager()
.getCommandStatusByTxId(dnList.stream().
map(DatanodeDetails::getUuid).collect(Collectors.toSet()));
ArrayList<Long> txIDs = new ArrayList<>();
metrics.setNumBlockDeletionTransactionDataNodes(dnList.size());
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue = null;
// Here takes block replica count as the threshold to avoid the case
// that part of replicas committed the TXN and recorded in the
// SCMDeletedBlockTransactionStatusManager, while they are counted
// in the threshold.
while (iter.hasNext() &&
transactions.getBlocksDeleted() < blockDeletionLimit) {
keyValue = iter.next();
DeletedBlocksTransaction txn = keyValue.getValue();
final ContainerID id = ContainerID.valueOf(txn.getContainerID());
try {
// HDDS-7126. When container is under replicated, it is possible
// that container is deleted, but transactions are not deleted.
if (containerManager.getContainer(id).isDeleted()) {
LOG.warn("Container: {} was deleted for the " +
"transaction: {}.", id, txn);
txIDs.add(txn.getTxID());
} else if (txn.getCount() > -1 && txn.getCount() <= maxRetry
&& !containerManager.getContainer(id).isOpen()) {
Set<ContainerReplica> replicas = containerManager
.getContainerReplicas(
ContainerID.valueOf(txn.getContainerID()));
if (checkInadequateReplica(replicas, txn, dnList)) {
metrics.incrSkippedTransaction();
continue;
}
getTransaction(
txn, transactions, dnList, replicas, commandStatus);
} else if (txn.getCount() >= maxRetry || containerManager.getContainer(id).isOpen()) {
metrics.incrSkippedTransaction();
}
} catch (ContainerNotFoundException ex) {
LOG.warn("Container: {} was not found for the transaction: {}.", id, txn);
txIDs.add(txn.getTxID());
}
if (lastProcessedTransactionId == keyValue.getKey()) {
// We have circled back to the last transaction.
break;
}
if (!iter.hasNext() && lastProcessedTransactionId != -1) {
/*
* We started from in-between and reached end of the table,
* now we should go to the start of the table and process
* the transactions.
*/
iter.seekToFirst();
}
}
lastProcessedTransactionId = keyValue != null ? keyValue.getKey() : -1;
if (!txIDs.isEmpty()) {
deletedBlockLogStateManager.removeTransactionsFromDB(txIDs);
metrics.incrBlockDeletionTransactionCompleted(txIDs.size());
}
}
return transactions;
} finally {
lock.unlock();
}
}