in server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreRangeTask.java [130:194]
public void handle(Promise<RestoreRange> event)
{
this.taskStartTimeNanos = restoreJobUtil.currentTimeNanos();
// exit early if the range has been cancelled already; the same check is performed at many following steps to avoid wasting computation
failOnCancelled(range, null)
// The range, when being process, requires a total of range size (download) + uncompressed (unzip) to use.
// The protection below guards the range being process, if the usable disk space falls below the threshold
// after considering the range
.compose(v -> ensureSufficientStorage(range.stageDirectory().toString(),
range.estimatedSpaceRequiredInBytes(),
requiredUsableSpacePercentage,
executorPool))
.compose(v -> failOnCancelled(range, v))
.compose(v -> {
RestoreJob job = range.job();
if (job.isManagedBySidecar())
{
if (job.status == RestoreJobStatus.STAGE_READY)
{
if (Files.exists(range.stagedObjectPath()))
{
LOGGER.debug("The slice has been staged already. sliceKey={} stagedFilePath={}",
range.sliceKey(), range.stagedObjectPath());
range.completeStagePhase(); // update the flag if missed
rangeDatabaseAccessor.updateStatus(range);
return Future.succeededFuture();
}
// 1. check object existence and validate eTag / checksum
return failOnCancelled(range, null)
.compose(value -> checkObjectExistence())
.compose(value -> failOnCancelled(range, value))
.compose(headObject -> downloadSlice())
.compose(value -> failOnCancelled(range, value))
.compose(file -> {
// completed staging. A new task is produced when it comes to import
range.completeStagePhase();
rangeDatabaseAccessor.updateStatus(range);
return Future.succeededFuture();
});
}
else if (job.status == RestoreJobStatus.IMPORT_READY)
{
return unzipAndImport(range.stagedObjectPath().toFile(),
// persist status
() -> rangeDatabaseAccessor.updateStatus(range));
}
else
{
String msg = "Unexpected restore job status. Expected only STAGE_READY or IMPORT_READY when " +
"processing active slices. Found status: " + job.statusWithOptionalDescription();
Exception unexpectedState = new IllegalStateException(msg);
return Future.failedFuture(RestoreJobExceptions.ofFatal("Unexpected restore job status",
range, unexpectedState));
}
}
else
{
return downloadSliceAndImport();
}
})
.onSuccess(v -> event.tryComplete(range))
.onFailure(cause -> event.tryFail(RestoreJobExceptions.propagate(cause)));
}