public void execute()

in server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java [159:206]


    public void execute(Promise<Void> promise)
    {
        while (workQueue.peek() != null // exit early when no pending slice and avoid acquire permits
               && processMaxConcurrency.tryAcquire())
        {
            RestoreRange range = workQueue.poll();
            if (range == null) // It should never happen because it peeks before polling. It is only to make IDE happy
            {
                processMaxConcurrency.releasePermit();
                break; // break in order to complete promise
            }

            // only create task when the restore job status is import ready for the staged ranges
            // otherwise, putting the range back to the staged queue and exit early.
            if (range.hasStaged() && range.job().status != RestoreJobStatus.IMPORT_READY)
            {
                workQueue.offerStaged(range);
                processMaxConcurrency.releasePermit();
                break;
            }

            // capture the new queue length after polling
            workQueue.captureImportQueueLength();
            RestoreRangeHandler task = range.toAsyncTask(s3ClientPool, pool, importer,
                                                         requiredUsableSpacePercentage,
                                                         rangeDatabaseAccessor,
                                                         restoreJobUtil,
                                                         localTokenRangesProvider,
                                                         metrics);

            activeTasks.put(task, slowTaskThreshold.toSeconds());
            pool.executeBlocking(task, false) // unordered; run in parallel
                // wrap success/failure handling in compose to catch any exception thrown
                .compose(taskSuccessHandler(task),
                         taskFailureHandler(range))
                // release counter
                .onComplete(ignored -> {
                    processMaxConcurrency.releasePermit();
                    // decrement the active slices and capture the new queue length
                    workQueue.decrementActiveSliceCount(range);
                    workQueue.captureImportQueueLength();
                    activeTasks.remove(task);
                });
        }
        checkForLongRunningTasks();
        workQueue.capturePendingSliceCount();
        promise.tryComplete();
    }