in server/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java [207:269]
private Future<File> rangeGetObject(RestoreRange range, Credentials credentials, Path destinationPath, TaskExecutorPool taskExecutorPool)
{
HttpRangesIterator iterator = new HttpRangesIterator(range.sliceObjectLength(), rangeHeaderSize);
Preconditions.checkState(iterator.hasNext(), "SliceObject is empty. sliceKey=" + range.sliceKey());
SeekableByteChannel seekableByteChannel;
try
{
seekableByteChannel = Files.newByteChannel(destinationPath, EnumSet.of(CREATE_NEW, WRITE));
}
catch (IOException e)
{
LOGGER.error("Failed to create file channel for downloading. jobId={} sliceKey={}",
range.jobId(), range.sliceKey(), e);
return Future.failedFuture(e);
}
Future<SeekableByteChannel> channelFuture = Future.succeededFuture(seekableByteChannel);
while (iterator.hasNext())
{
HttpRange httpRange = iterator.next();
// Schedule each part download in the taskExecutorPool with ordered == true.
// Parts are downloaded one by one in sequence.
channelFuture = channelFuture.compose(channel -> taskExecutorPool.executeBlocking(() -> {
// the length is guaranteed to be no greater than rangeHeaderSize (int)
int actualRangeSize = (int) httpRange.length();
// throttle the download throughput
downloadRateLimiter.acquire(actualRangeSize);
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
GetObjectRequest request =
GetObjectRequest.builder()
.overrideConfiguration(b -> b.credentialsProvider(credentials.awsCredentialsProvider()))
.bucket(range.sliceBucket())
.key(range.sliceKey())
.range(httpRange.toString())
.build();
// note: it is a blocking get; No parallelism in getting the ranges of the same object
ResponseBytes<GetObjectResponse> bytes = client.getObject(request, AsyncResponseTransformer.toBytes()).get();
channel.write(bytes.asByteBuffer());
return channel;
}, true));
}
return channelFuture
// eventually is evaluated in both success and failure cases
.eventually(() -> taskExecutorPool.runBlocking(() -> {
ThrowableUtils.propagate(() -> closeChannel(seekableByteChannel));
}, true))
.compose(channel -> Future.succeededFuture(destinationPath.toFile()),
failure -> { // failure mapper; log the credential on failure
LOGGER.error("Request is not successful. jobId={} credentials={}",
range.jobId(), credentials.readCredentials, failure);
try
{
Files.deleteIfExists(destinationPath);
}
catch (IOException e)
{
LOGGER.warn("Failed to clean up the failed download. jobId={} sliceKey={}",
range.jobId(), range.sliceKey(), e);
failure.addSuppressed(e);
}
return Future.failedFuture(failure);
});
}