in sdk/Common/ResumableDownloadManager.cs [199:285]
private void DoResumableDownloadMultiThread(DownloadObjectRequest request, ResumableDownloadContext resumableContext,
EventHandler<StreamTransferProgressArgs> downloadProgressCallback)
{
_downloadedBytes = resumableContext.GetDownloadedBytes();
if (!request.MatchingETagConstraints.Contains(resumableContext.ETag))
{
request.MatchingETagConstraints.Add(resumableContext.ETag);
}
Exception e = null;
int parallel = Math.Min(Math.Min(request.ParallelThreadCount, resumableContext.PartContextList.Count), Environment.ProcessorCount);
ManualResetEvent[] taskFinishedEvents = new ManualResetEvent[parallel];
DownloadTaskParam[] taskParams = new DownloadTaskParam[parallel];
int nextPart = 0;
for (nextPart = 0; nextPart < parallel;nextPart++)
{
var part = resumableContext.PartContextList[nextPart];
taskParams[nextPart] = StartDownloadPartTask(request, part, downloadProgressCallback);
taskFinishedEvents[nextPart] = taskParams[nextPart].DownloadFinished;
}
bool allTasksDone = false;
try
{
long totalBytes = resumableContext.GetTotalBytes();
long lastDownloadedBytes = _downloadedBytes;
while (nextPart < resumableContext.PartContextList.Count)
{
int index = ManualResetEvent.WaitAny(taskFinishedEvents);
if (taskParams[index].Error == null)
{
if (request.StreamTransferProgress != null)
{
lastDownloadedBytes = _downloadedBytes;
StreamTransferProgressArgs args = new StreamTransferProgressArgs(resumableContext.PartContextList[index].Length, lastDownloadedBytes, totalBytes);
request.StreamTransferProgress.Invoke(this, args);
}
resumableContext.Dump();
}
else
{
e = taskParams[index].Error;
}
taskFinishedEvents[index].Close();
taskParams[index] = StartDownloadPartTask(request, resumableContext.PartContextList[nextPart++], downloadProgressCallback);
taskFinishedEvents[index] = taskParams[index].DownloadFinished;
}
ManualResetEvent.WaitAll(taskFinishedEvents);
allTasksDone = true;
if (request.StreamTransferProgress != null)
{
StreamTransferProgressArgs args = new StreamTransferProgressArgs(_downloadedBytes - lastDownloadedBytes, _downloadedBytes, totalBytes);
request.StreamTransferProgress.Invoke(this, args);
}
if (e == null)
{
Validate(request, resumableContext);
}
}
finally
{
if (!allTasksDone)
{
ManualResetEvent.WaitAll(taskFinishedEvents);
}
for (int i = 0; i < parallel; i++)
{
taskFinishedEvents[i].Close();
if (taskParams[i].Error != null)
{
e = taskParams[i].Error;
}
}
if (e != null)
{
throw e;
}
}
}