in server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java [159:206]
public void execute(Promise<Void> promise)
{
while (workQueue.peek() != null // exit early when no pending slice and avoid acquire permits
&& processMaxConcurrency.tryAcquire())
{
RestoreRange range = workQueue.poll();
if (range == null) // It should never happen because it peeks before polling. It is only to make IDE happy
{
processMaxConcurrency.releasePermit();
break; // break in order to complete promise
}
// only create task when the restore job status is import ready for the staged ranges
// otherwise, putting the range back to the staged queue and exit early.
if (range.hasStaged() && range.job().status != RestoreJobStatus.IMPORT_READY)
{
workQueue.offerStaged(range);
processMaxConcurrency.releasePermit();
break;
}
// capture the new queue length after polling
workQueue.captureImportQueueLength();
RestoreRangeHandler task = range.toAsyncTask(s3ClientPool, pool, importer,
requiredUsableSpacePercentage,
rangeDatabaseAccessor,
restoreJobUtil,
localTokenRangesProvider,
metrics);
activeTasks.put(task, slowTaskThreshold.toSeconds());
pool.executeBlocking(task, false) // unordered; run in parallel
// wrap success/failure handling in compose to catch any exception thrown
.compose(taskSuccessHandler(task),
taskFailureHandler(range))
// release counter
.onComplete(ignored -> {
processMaxConcurrency.releasePermit();
// decrement the active slices and capture the new queue length
workQueue.decrementActiveSliceCount(range);
workQueue.captureImportQueueLength();
activeTasks.remove(task);
});
}
checkForLongRunningTasks();
workQueue.capturePendingSliceCount();
promise.tryComplete();
}