public async Task DownloadToInternal()

in sdk/storage/Azure.Storage.Blobs/src/PartitionedDownloader.cs [138:371]


        public async Task<Response> DownloadToInternal(
            Stream destination,
            BlobRequestConditions conditions,
            bool async,
            CancellationToken cancellationToken)
        {
            // Wrap the download range calls in a Download span for distributed
            // tracing
            DiagnosticScope scope = _client.ClientConfiguration.ClientDiagnostics.CreateScope(_operationName);
            using DisposableBucket disposables = new DisposableBucket();
            Queue<Task<Response<BlobDownloadStreamingResult>>> runningTasks = null;
            try
            {
                scope.Start();

                // Just start downloading using an initial range.  If it's a
                // small blob, we'll get the whole thing in one shot.  If it's
                // a large blob, we'll get its full size in Content-Range and
                // can keep downloading it in segments.
                var initialRange = new HttpRange(0, _initialRangeSize);
                Response<BlobDownloadStreamingResult> initialResponse;

                try
                {
                    initialResponse = await _client.DownloadStreamingInternal(
                        initialRange,
                        conditions,
                        ValidationOptions,
                        _progress,
                        _innerOperationName,
                        async,
                        cancellationToken).ConfigureAwait(false);
                }
                catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.InvalidRange)
                {
                    initialResponse = await _client.DownloadStreamingInternal(
                        range: default,
                        conditions,
                        ValidationOptions,
                        _progress,
                        _innerOperationName,
                        async,
                        cancellationToken).ConfigureAwait(false);
                }

                // If the initial request returned no content (i.e., a 304),
                // we'll pass that back to the user immediately
                if (initialResponse.IsUnavailable())
                {
                    return initialResponse.GetRawResponse();
                }

                // Destination wrapped in decrypt step if needed (determined by initial response)
                if (_client.UsingClientSideEncryption)
                {
                    if (initialResponse.Value.Details.Metadata.TryGetValue(Constants.ClientSideEncryption.EncryptionDataKey, out string rawEncryptiondata))
                    {
                        destination = await new BlobClientSideDecryptor(
                            new ClientSideDecryptor(_client.ClientSideEncryption)).DecryptWholeBlobWriteInternal(
                                destination,
                                initialResponse.Value.Details.Metadata,
                                async,
                                cancellationToken).ConfigureAwait(false);
                    }
                }

                // Destination wrapped in master crc step if needed (must wait until after encryption wrap check)
                Memory<byte> composedCrc = default;
                if (UseMasterCrc)
                {
                    _masterCrcCalculator = StorageCrc64HashAlgorithm.Create();
                    destination = ChecksumCalculatingStream.GetWriteStream(destination, _masterCrcCalculator.Append);
                    disposables.Add(_arrayPool.RentAsMemoryDisposable(
                        Constants.StorageCrc64SizeInBytes, out composedCrc));
                    composedCrc.Span.Clear();
                }

                // If the first segment was the entire blob, we'll copy that to
                // the output stream and finish now
                long initialLength = initialResponse.Value.Details.ContentLength;
                long totalLength = ParseRangeTotalLength(initialResponse.Value.Details.ContentRange);
                if (initialLength == totalLength)
                {
                    await HandleOneShotDownload(initialResponse, destination, async, cancellationToken)
                        .ConfigureAwait(false);
                    return initialResponse.GetRawResponse();
                }

                // Capture the etag from the first segment and construct
                // conditions to ensure the blob doesn't change while we're
                // downloading the remaining segments
                ETag etag = initialResponse.Value.Details.ETag;
                BlobRequestConditions conditionsWithEtag = conditions?.WithIfMatch(etag) ?? new BlobRequestConditions { IfMatch = etag };

#pragma warning disable AZC0110 // DO NOT use await keyword in possibly synchronous scope.
                                // Rule checker cannot understand this section, but this
                                // massively reduces code duplication.
                int effectiveWorkerCount = async ? _maxWorkerCount : 1;
                if (effectiveWorkerCount > 1)
                {
                    runningTasks = new();
                    runningTasks.Enqueue(Task.FromResult(initialResponse));
                }
                else
                {
                    using (_arrayPool.RentAsMemoryDisposable(_checksumSize, out Memory<byte> partitionChecksum))
                    {
                        await CopyToInternal(initialResponse, destination, partitionChecksum, async, cancellationToken).ConfigureAwait(false);
                        if (UseMasterCrc)
                        {
                            StorageCrc64Composer.Compose(
                                (composedCrc.ToArray(), 0L),
                                (partitionChecksum.ToArray(), initialResponse.Value.Details.ContentLength)
                            ).CopyTo(composedCrc);
                        }
                    }
                }

                // Fill the queue with tasks to download each of the remaining
                // ranges in the blob
                foreach (HttpRange httpRange in GetRanges(initialLength, totalLength))
                {
                    ValueTask<Response<BlobDownloadStreamingResult>> responseValueTask = _client
                        .DownloadStreamingInternal(
                            httpRange,
                            conditionsWithEtag,
                            ValidationOptions,
                            _progress,
                            _innerOperationName,
                            async,
                            cancellationToken);
                    if (runningTasks != null)
                    {
                        // Add the next Task (which will start the download but
                        // return before it's completed downloading)
                        runningTasks.Enqueue(responseValueTask.AsTask());

                        // If we have fewer tasks than alotted workers, then just
                        // continue adding tasks until we have effectiveWorkerCount
                        // running in parallel
                        if (runningTasks.Count < effectiveWorkerCount)
                        {
                            continue;
                        }

                        // Once all the workers are busy, wait for the first
                        // segment to finish downloading before we create more work
                        await ConsumeQueuedTask().ConfigureAwait(false);
                    }
                    else
                    {
                        Response<BlobDownloadStreamingResult> result = await responseValueTask.ConfigureAwait(false);
                        using (_arrayPool.RentAsMemoryDisposable(_checksumSize, out Memory<byte> partitionChecksum))
                        {
                            await CopyToInternal(result, destination, partitionChecksum, async, cancellationToken).ConfigureAwait(false);
                            if (UseMasterCrc)
                            {
                                StorageCrc64Composer.Compose(
                                    (composedCrc.ToArray(), 0L),
                                    (partitionChecksum.ToArray(), result.Value.Details.ContentLength)
                                ).CopyTo(composedCrc);
                            }
                        }
                    }
                }

                // Wait for all of the remaining segments to download
                if (runningTasks != null)
                {
                    while (runningTasks.Count > 0)
                    {
                        await ConsumeQueuedTask().ConfigureAwait(false);
                    }
                }
#pragma warning restore AZC0110 // DO NOT use await keyword in possibly synchronous scope.

                await FinalizeDownloadInternal(destination, composedCrc, async, cancellationToken)
                    .ConfigureAwait(false);
                return initialResponse.GetRawResponse();

                // Wait for the first segment in the queue of tasks to complete
                // downloading and copy it to the destination stream
                async Task ConsumeQueuedTask()
                {
                    // Don't need to worry about 304s here because the ETag
                    // condition will turn into a 412 and throw a proper
                    // RequestFailedException
                    Response<BlobDownloadStreamingResult> response =
                        await runningTasks.Dequeue().ConfigureAwait(false);

                    // Even though the BlobDownloadInfo is returned immediately,
                    // CopyToAsync causes ConsumeQueuedTask to wait until the
                    // download is complete

                    using (_arrayPool.RentAsMemoryDisposable(_checksumSize, out Memory<byte> partitionChecksum))
                    {
                        await CopyToInternal(
                            response,
                            destination,
                            partitionChecksum,
                            async,
                            cancellationToken)
                            .ConfigureAwait(false);
                            if (UseMasterCrc)
                            {
                                StorageCrc64Composer.Compose(
                                    (composedCrc.ToArray(), 0L),
                                    (partitionChecksum.ToArray(), response.Value.Details.ContentLength)
                                ).CopyTo(composedCrc);
                            }
                    }
                }
            }
            catch (Exception ex)
            {
                scope.Failed(ex);
                throw;
            }
            finally
            {
#pragma warning disable AZC0110
                if (runningTasks != null)
                {
                    async Task DisposeStreamAsync(Task<Response<BlobDownloadStreamingResult>> task)
                    {
                        Response<BlobDownloadStreamingResult> response = await task.ConfigureAwait(false);
                        response.Value.Content.Dispose();
                    }
                    await Task.WhenAll(runningTasks.Select(DisposeStreamAsync)).ConfigureAwait(false);
                }
#pragma warning restore AZC0110
                scope.Dispose();
            }
        }