private void DoResumableDownloadMultiThread()

in sdk/Common/ResumableDownloadManager.cs [199:285]


        private void DoResumableDownloadMultiThread(DownloadObjectRequest request, ResumableDownloadContext resumableContext,
                                                    EventHandler<StreamTransferProgressArgs> downloadProgressCallback)
        {
            _downloadedBytes = resumableContext.GetDownloadedBytes();
            if (!request.MatchingETagConstraints.Contains(resumableContext.ETag))
            {
                request.MatchingETagConstraints.Add(resumableContext.ETag);
            } 

            Exception e = null;
            int parallel = Math.Min(Math.Min(request.ParallelThreadCount, resumableContext.PartContextList.Count), Environment.ProcessorCount);
            ManualResetEvent[] taskFinishedEvents = new ManualResetEvent[parallel];
            DownloadTaskParam[] taskParams = new DownloadTaskParam[parallel];
            int nextPart = 0;
            for (nextPart = 0; nextPart < parallel;nextPart++)
            {
                var part = resumableContext.PartContextList[nextPart];
                taskParams[nextPart] = StartDownloadPartTask(request, part, downloadProgressCallback);
                taskFinishedEvents[nextPart] = taskParams[nextPart].DownloadFinished;
            }

            bool allTasksDone = false;

            try
            {
                long totalBytes = resumableContext.GetTotalBytes();
                long lastDownloadedBytes = _downloadedBytes;
                while (nextPart < resumableContext.PartContextList.Count)
                {
                    int index = ManualResetEvent.WaitAny(taskFinishedEvents);
                    if (taskParams[index].Error == null)
                    {
                        if (request.StreamTransferProgress != null)
                        {
                            lastDownloadedBytes = _downloadedBytes;
                            StreamTransferProgressArgs args = new StreamTransferProgressArgs(resumableContext.PartContextList[index].Length, lastDownloadedBytes, totalBytes);
                            request.StreamTransferProgress.Invoke(this, args);
                        }

                        resumableContext.Dump();
                    }
                    else
                    {
                        e = taskParams[index].Error;
                    }

                    taskFinishedEvents[index].Close();
                    taskParams[index] = StartDownloadPartTask(request, resumableContext.PartContextList[nextPart++], downloadProgressCallback);
                    taskFinishedEvents[index] = taskParams[index].DownloadFinished;
                }

                ManualResetEvent.WaitAll(taskFinishedEvents);
                allTasksDone = true;

                if (request.StreamTransferProgress != null)
                {
                    StreamTransferProgressArgs args = new StreamTransferProgressArgs(_downloadedBytes - lastDownloadedBytes, _downloadedBytes, totalBytes);
                    request.StreamTransferProgress.Invoke(this, args);
                }

                if (e == null)
                {
                    Validate(request, resumableContext);
                }
            }
            finally
            {
                if (!allTasksDone)
                {
                    ManualResetEvent.WaitAll(taskFinishedEvents);
                }

                for (int i = 0; i < parallel; i++)
                {
                    taskFinishedEvents[i].Close();
                    if (taskParams[i].Error != null)
                    {
                        e = taskParams[i].Error;
                    }
                }

                if (e != null)
                {
                    throw e;
                }
            }
        }