in sdk/storage/Azure.Storage.Blobs/src/PartitionedDownloader.cs [138:371]
public async Task<Response> DownloadToInternal(
Stream destination,
BlobRequestConditions conditions,
bool async,
CancellationToken cancellationToken)
{
// Wrap the download range calls in a Download span for distributed
// tracing
DiagnosticScope scope = _client.ClientConfiguration.ClientDiagnostics.CreateScope(_operationName);
using DisposableBucket disposables = new DisposableBucket();
Queue<Task<Response<BlobDownloadStreamingResult>>> runningTasks = null;
try
{
scope.Start();
// Just start downloading using an initial range. If it's a
// small blob, we'll get the whole thing in one shot. If it's
// a large blob, we'll get its full size in Content-Range and
// can keep downloading it in segments.
var initialRange = new HttpRange(0, _initialRangeSize);
Response<BlobDownloadStreamingResult> initialResponse;
try
{
initialResponse = await _client.DownloadStreamingInternal(
initialRange,
conditions,
ValidationOptions,
_progress,
_innerOperationName,
async,
cancellationToken).ConfigureAwait(false);
}
catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.InvalidRange)
{
initialResponse = await _client.DownloadStreamingInternal(
range: default,
conditions,
ValidationOptions,
_progress,
_innerOperationName,
async,
cancellationToken).ConfigureAwait(false);
}
// If the initial request returned no content (i.e., a 304),
// we'll pass that back to the user immediately
if (initialResponse.IsUnavailable())
{
return initialResponse.GetRawResponse();
}
// Destination wrapped in decrypt step if needed (determined by initial response)
if (_client.UsingClientSideEncryption)
{
if (initialResponse.Value.Details.Metadata.TryGetValue(Constants.ClientSideEncryption.EncryptionDataKey, out string rawEncryptiondata))
{
destination = await new BlobClientSideDecryptor(
new ClientSideDecryptor(_client.ClientSideEncryption)).DecryptWholeBlobWriteInternal(
destination,
initialResponse.Value.Details.Metadata,
async,
cancellationToken).ConfigureAwait(false);
}
}
// Destination wrapped in master crc step if needed (must wait until after encryption wrap check)
Memory<byte> composedCrc = default;
if (UseMasterCrc)
{
_masterCrcCalculator = StorageCrc64HashAlgorithm.Create();
destination = ChecksumCalculatingStream.GetWriteStream(destination, _masterCrcCalculator.Append);
disposables.Add(_arrayPool.RentAsMemoryDisposable(
Constants.StorageCrc64SizeInBytes, out composedCrc));
composedCrc.Span.Clear();
}
// If the first segment was the entire blob, we'll copy that to
// the output stream and finish now
long initialLength = initialResponse.Value.Details.ContentLength;
long totalLength = ParseRangeTotalLength(initialResponse.Value.Details.ContentRange);
if (initialLength == totalLength)
{
await HandleOneShotDownload(initialResponse, destination, async, cancellationToken)
.ConfigureAwait(false);
return initialResponse.GetRawResponse();
}
// Capture the etag from the first segment and construct
// conditions to ensure the blob doesn't change while we're
// downloading the remaining segments
ETag etag = initialResponse.Value.Details.ETag;
BlobRequestConditions conditionsWithEtag = conditions?.WithIfMatch(etag) ?? new BlobRequestConditions { IfMatch = etag };
#pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope.
// Rule checker cannot understand this section, but this
// massively reduces code duplication.
int effectiveWorkerCount = async ? _maxWorkerCount : 1;
if (effectiveWorkerCount > 1)
{
runningTasks = new();
runningTasks.Enqueue(Task.FromResult(initialResponse));
}
else
{
using (_arrayPool.RentAsMemoryDisposable(_checksumSize, out Memory<byte> partitionChecksum))
{
await CopyToInternal(initialResponse, destination, partitionChecksum, async, cancellationToken).ConfigureAwait(false);
if (UseMasterCrc)
{
StorageCrc64Composer.Compose(
(composedCrc.ToArray(), 0L),
(partitionChecksum.ToArray(), initialResponse.Value.Details.ContentLength)
).CopyTo(composedCrc);
}
}
}
// Fill the queue with tasks to download each of the remaining
// ranges in the blob
foreach (HttpRange httpRange in GetRanges(initialLength, totalLength))
{
ValueTask<Response<BlobDownloadStreamingResult>> responseValueTask = _client
.DownloadStreamingInternal(
httpRange,
conditionsWithEtag,
ValidationOptions,
_progress,
_innerOperationName,
async,
cancellationToken);
if (runningTasks != null)
{
// Add the next Task (which will start the download but
// return before it's completed downloading)
runningTasks.Enqueue(responseValueTask.AsTask());
// If we have fewer tasks than alotted workers, then just
// continue adding tasks until we have effectiveWorkerCount
// running in parallel
if (runningTasks.Count < effectiveWorkerCount)
{
continue;
}
// Once all the workers are busy, wait for the first
// segment to finish downloading before we create more work
await ConsumeQueuedTask().ConfigureAwait(false);
}
else
{
Response<BlobDownloadStreamingResult> result = await responseValueTask.ConfigureAwait(false);
using (_arrayPool.RentAsMemoryDisposable(_checksumSize, out Memory<byte> partitionChecksum))
{
await CopyToInternal(result, destination, partitionChecksum, async, cancellationToken).ConfigureAwait(false);
if (UseMasterCrc)
{
StorageCrc64Composer.Compose(
(composedCrc.ToArray(), 0L),
(partitionChecksum.ToArray(), result.Value.Details.ContentLength)
).CopyTo(composedCrc);
}
}
}
}
// Wait for all of the remaining segments to download
if (runningTasks != null)
{
while (runningTasks.Count > 0)
{
await ConsumeQueuedTask().ConfigureAwait(false);
}
}
#pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope.
await FinalizeDownloadInternal(destination, composedCrc, async, cancellationToken)
.ConfigureAwait(false);
return initialResponse.GetRawResponse();
// Wait for the first segment in the queue of tasks to complete
// downloading and copy it to the destination stream
async Task ConsumeQueuedTask()
{
// Don't need to worry about 304s here because the ETag
// condition will turn into a 412 and throw a proper
// RequestFailedException
Response<BlobDownloadStreamingResult> response =
await runningTasks.Dequeue().ConfigureAwait(false);
// Even though the BlobDownloadInfo is returned immediately,
// CopyToAsync causes ConsumeQueuedTask to wait until the
// download is complete
using (_arrayPool.RentAsMemoryDisposable(_checksumSize, out Memory<byte> partitionChecksum))
{
await CopyToInternal(
response,
destination,
partitionChecksum,
async,
cancellationToken)
.ConfigureAwait(false);
if (UseMasterCrc)
{
StorageCrc64Composer.Compose(
(composedCrc.ToArray(), 0L),
(partitionChecksum.ToArray(), response.Value.Details.ContentLength)
).CopyTo(composedCrc);
}
}
}
}
catch (Exception ex)
{
scope.Failed(ex);
throw;
}
finally
{
#pragma warning disable AZC0110
if (runningTasks != null)
{
async Task DisposeStreamAsync(Task<Response<BlobDownloadStreamingResult>> task)
{
Response<BlobDownloadStreamingResult> response = await task.ConfigureAwait(false);
response.Value.Content.Dispose();
}
await Task.WhenAll(runningTasks.Select(DisposeStreamAsync)).ConfigureAwait(false);
}
#pragma warning restore AZC0110
scope.Dispose();
}
}