in sdk/Common/ResumableUploadManager.cs [323:411]
private void DoResumableUploadFileMultiThread(string bucketName, string key, ResumableContext resumableContext, FileStream fs,
EventHandler<StreamTransferProgressArgs> uploadProgressCallback)
{
_uploadProgressCallback = uploadProgressCallback;
_uploadedBytes = resumableContext.GetUploadedBytes();
_totalBytes = fs.Length;
_incrementalUploadedBytes = 0;
Exception e = null;
int parallel = Math.Min(_request.ParallelThreadCount, resumableContext.PartContextList.Count);
ManualResetEvent[] taskFinishEvents = new ManualResetEvent[parallel];
UploadTask[] runningTasks = new UploadTask[parallel];
fs.Seek(0, SeekOrigin.Begin);
bool allTaskDone = false;
for (int i = 0; i < parallel; i++)
{
UploadTask param = CreateTask(i, resumableContext, fs);
taskFinishEvents[i] = param.Finished;
runningTasks[i] = param;
StartUploadPartTask(param);
}
int nextPart = parallel;
try
{
while (nextPart < resumableContext.PartContextList.Count)
{
int index = ManualResetEvent.WaitAny(taskFinishEvents);
if (runningTasks[index].Error == null)
{
resumableContext.Dump();
}
else
{
e = runningTasks[index].Error;
}
runningTasks[index].Finished.Close();
runningTasks[index].InputStream.Dispose();
UploadTask task = CreateTask(nextPart, resumableContext, fs);
StartUploadPartTask(task);
runningTasks[index] = task;
taskFinishEvents[index] = runningTasks[index].Finished;
nextPart++;
}
WaitHandle.WaitAll(taskFinishEvents);
allTaskDone = true;
}
finally
{
if (!allTaskDone)
{
WaitHandle.WaitAll(taskFinishEvents);
}
if (uploadProgressCallback != null)
{
long latestUploadedBytes = resumableContext.GetUploadedBytes();
long lastIncrementalUploadedBytes = latestUploadedBytes - _uploadedBytes + _incrementalUploadedBytes;
if (lastIncrementalUploadedBytes > 0)
{
StreamTransferProgressArgs progress = new StreamTransferProgressArgs(lastIncrementalUploadedBytes, latestUploadedBytes, fs.Length);
uploadProgressCallback.Invoke(this, progress);
}
_uploadedBytes = latestUploadedBytes;
}
for (int i = 0; i < parallel; i++)
{
taskFinishEvents[i].Close();
if (runningTasks[i].Error != null)
{
e = runningTasks[i].Error;
}
runningTasks[i].InputStream.Dispose();
}
resumableContext.Dump();
if (e != null)
{
throw e;
}
}
}