private Future rangeGetObject()

in server/src/main/java/org/apache/cassandra/sidecar/restore/StorageClient.java [207:269]


    private Future<File> rangeGetObject(RestoreRange range, Credentials credentials, Path destinationPath, TaskExecutorPool taskExecutorPool)
    {
        HttpRangesIterator iterator = new HttpRangesIterator(range.sliceObjectLength(), rangeHeaderSize);
        Preconditions.checkState(iterator.hasNext(), "SliceObject is empty. sliceKey=" + range.sliceKey());

        SeekableByteChannel seekableByteChannel;
        try
        {
            seekableByteChannel = Files.newByteChannel(destinationPath, EnumSet.of(CREATE_NEW, WRITE));
        }
        catch (IOException e)
        {
            LOGGER.error("Failed to create file channel for downloading. jobId={} sliceKey={}",
                         range.jobId(), range.sliceKey(), e);
            return Future.failedFuture(e);
        }
        Future<SeekableByteChannel> channelFuture = Future.succeededFuture(seekableByteChannel);
        while (iterator.hasNext())
        {
            HttpRange httpRange = iterator.next();
            // Schedule each part download in the taskExecutorPool with ordered == true.
            // Parts are downloaded one by one in sequence.
            channelFuture = channelFuture.compose(channel -> taskExecutorPool.executeBlocking(() -> {
                // the length is guaranteed to be no greater than rangeHeaderSize (int)
                int actualRangeSize = (int) httpRange.length();
                // throttle the download throughput
                downloadRateLimiter.acquire(actualRangeSize);
                // https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
                GetObjectRequest request =
                GetObjectRequest.builder()
                                .overrideConfiguration(b -> b.credentialsProvider(credentials.awsCredentialsProvider()))
                                .bucket(range.sliceBucket())
                                .key(range.sliceKey())
                                .range(httpRange.toString())
                                .build();
                // note: it is a blocking get; No parallelism in getting the ranges of the same object
                ResponseBytes<GetObjectResponse> bytes = client.getObject(request, AsyncResponseTransformer.toBytes()).get();
                channel.write(bytes.asByteBuffer());
                return channel;
            }, true));
        }
        return channelFuture
               // eventually is evaluated in both success and failure cases
               .eventually(() -> taskExecutorPool.runBlocking(() -> {
                   ThrowableUtils.propagate(() -> closeChannel(seekableByteChannel));
               }, true))
               .compose(channel -> Future.succeededFuture(destinationPath.toFile()),
                        failure -> { // failure mapper; log the credential on failure
                            LOGGER.error("Request is not successful. jobId={} credentials={}",
                                         range.jobId(), credentials.readCredentials, failure);
                            try
                            {
                                Files.deleteIfExists(destinationPath);
                            }
                            catch (IOException e)
                            {
                                LOGGER.warn("Failed to clean up the failed download. jobId={} sliceKey={}",
                                            range.jobId(), range.sliceKey(), e);
                                failure.addSuppressed(e);
                            }
                            return Future.failedFuture(failure);
                        });
    }