public void handle()

in server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java [130:194]


    public void handle(Promise<RestoreRange> event)
    {
        this.taskStartTimeNanos = restoreJobUtil.currentTimeNanos();

        // exit early if the range has been cancelled already; the same check is performed at many following steps to avoid wasting computation
        failOnCancelled(range, null)
        // The range, when being process, requires a total of range size (download) + uncompressed (unzip) to use.
        // The protection below guards the range being process, if the usable disk space falls below the threshold
        // after considering the range
        .compose(v -> ensureSufficientStorage(range.stageDirectory().toString(),
                                              range.estimatedSpaceRequiredInBytes(),
                                              requiredUsableSpacePercentage,
                                              executorPool))
        .compose(v -> failOnCancelled(range, v))
        .compose(v -> {
            RestoreJob job = range.job();
            if (job.isManagedBySidecar())
            {
                if (job.status == RestoreJobStatus.STAGE_READY)
                {
                    if (Files.exists(range.stagedObjectPath()))
                    {
                        LOGGER.debug("The slice has been staged already. sliceKey={} stagedFilePath={}",
                                     range.sliceKey(), range.stagedObjectPath());
                        range.completeStagePhase(); // update the flag if missed
                        rangeDatabaseAccessor.updateStatus(range);
                        return Future.succeededFuture();
                    }

                    // 1. check object existence and validate eTag / checksum
                    return failOnCancelled(range, null)
                           .compose(value -> checkObjectExistence())
                           .compose(value -> failOnCancelled(range, value))
                           .compose(headObject -> downloadSlice())
                           .compose(value -> failOnCancelled(range, value))
                           .compose(file -> {
                               // completed staging. A new task is produced when it comes to import
                               range.completeStagePhase();
                               rangeDatabaseAccessor.updateStatus(range);
                               return Future.succeededFuture();
                           });
                }
                else if (job.status == RestoreJobStatus.IMPORT_READY)
                {
                    return unzipAndImport(range.stagedObjectPath().toFile(),
                                          // persist status
                                          () -> rangeDatabaseAccessor.updateStatus(range));
                }
                else
                {
                    String msg = "Unexpected restore job status. Expected only STAGE_READY or IMPORT_READY when " +
                                 "processing active slices. Found status: " + job.statusWithOptionalDescription();
                    Exception unexpectedState = new IllegalStateException(msg);
                    return Future.failedFuture(RestoreJobExceptions.ofFatal("Unexpected restore job status",
                                                                            range, unexpectedState));
                }
            }
            else
            {
                return downloadSliceAndImport();
            }
        })
        .onSuccess(v -> event.tryComplete(range))
        .onFailure(cause -> event.tryFail(RestoreJobExceptions.propagate(cause)));
    }