private void DoResumableUploadFileMultiThread()

in sdk/Common/ResumableUploadManager.cs [323:411]


        private void DoResumableUploadFileMultiThread(string bucketName, string key, ResumableContext resumableContext, FileStream fs,
                                   EventHandler<StreamTransferProgressArgs> uploadProgressCallback)
        {
            _uploadProgressCallback = uploadProgressCallback;
            _uploadedBytes = resumableContext.GetUploadedBytes();
            _totalBytes = fs.Length;
            _incrementalUploadedBytes = 0;

            Exception e = null;
            int parallel = Math.Min(_request.ParallelThreadCount, resumableContext.PartContextList.Count);

            ManualResetEvent[] taskFinishEvents = new ManualResetEvent[parallel];
            UploadTask[] runningTasks = new UploadTask[parallel];
            fs.Seek(0, SeekOrigin.Begin);

            bool allTaskDone = false;
            for (int i = 0; i < parallel; i++)
            {
                UploadTask param = CreateTask(i, resumableContext, fs);
                taskFinishEvents[i] = param.Finished;
                runningTasks[i] = param;
                StartUploadPartTask(param);
            }

            int nextPart = parallel;
            try
            {
                while (nextPart < resumableContext.PartContextList.Count)
                {
                    int index = ManualResetEvent.WaitAny(taskFinishEvents);
                    if (runningTasks[index].Error == null)
                    {
                        resumableContext.Dump();
                    }
                    else
                    {
                        e = runningTasks[index].Error;
                    }

                    runningTasks[index].Finished.Close();
                    runningTasks[index].InputStream.Dispose();
                    UploadTask task = CreateTask(nextPart, resumableContext, fs);
                    StartUploadPartTask(task);
                    runningTasks[index] = task;
                    taskFinishEvents[index] = runningTasks[index].Finished;
                    nextPart++;
                }

                WaitHandle.WaitAll(taskFinishEvents);
                allTaskDone = true;
            }
            finally
            {
                if (!allTaskDone)
                {
                    WaitHandle.WaitAll(taskFinishEvents);
                }

                if (uploadProgressCallback != null)
                {
                    long latestUploadedBytes = resumableContext.GetUploadedBytes();
                    long lastIncrementalUploadedBytes = latestUploadedBytes - _uploadedBytes + _incrementalUploadedBytes;
                    if (lastIncrementalUploadedBytes > 0)
                    {
                        StreamTransferProgressArgs progress = new StreamTransferProgressArgs(lastIncrementalUploadedBytes, latestUploadedBytes, fs.Length);
                        uploadProgressCallback.Invoke(this, progress);
                    }

                    _uploadedBytes = latestUploadedBytes;
                }

                for (int i = 0; i < parallel; i++)
                {
                    taskFinishEvents[i].Close();
                    if (runningTasks[i].Error != null)
                    {
                        e = runningTasks[i].Error;
                    }
                    runningTasks[i].InputStream.Dispose();
                }

                resumableContext.Dump();
                if (e != null)
                {
                    throw e;
                }
            }

        }