in sdk/Common/ResumableUploadManager.cs [421:555]
private void DoResumableUploadPreReadMultiThread(string bucketName, string key, ResumableContext resumableContext, Stream fs,
EventHandler<StreamTransferProgressArgs> uploadProgressCallback)
{
_uploadProgressCallback = uploadProgressCallback;
_uploadedBytes = resumableContext.GetUploadedBytes();
_totalBytes = fs.Length;
_incrementalUploadedBytes = 0;
Exception e = null;
int parallel = Math.Min(_request.ParallelThreadCount, resumableContext.PartContextList.Count);
int preReadPartCount = Math.Min(parallel, _conf.PreReadBufferCount) + parallel;
ManualResetEvent[] taskFinishEvents = new ManualResetEvent[parallel];
UploadTask[] runningTasks = new UploadTask[parallel];
fs.Seek(0, SeekOrigin.Begin);
// init the buffer pool
PreReadThreadParam preReadParam = new PreReadThreadParam();
preReadParam.Fs = fs;
preReadParam.ResumableContext = resumableContext;
for (int i = 0; i < preReadPartCount && i < resumableContext.PartContextList.Count; i++)
{
var part = resumableContext.PartContextList[i];
preReadParam.ReturnBuffer(new MemoryStream((int)part.Length));
}
Thread thread = new Thread(new ParameterizedThreadStart(StartPreRead));
thread.Start(preReadParam);
bool allTaskDone = false;
for (int i = 0; i < parallel; i++)
{
UploadTask task = preReadParam.TakeTask();
if (task == null)
{
continue;
}
taskFinishEvents[i] = task.Finished;
runningTasks[i] = task;
StartUploadPartTask(task);
}
int nextPart = parallel;
try
{
int waitingCount = 0;
const int MaxWaitingCount = 100;
while (nextPart < resumableContext.PartContextList.Count && waitingCount < MaxWaitingCount)
{
int index = ManualResetEvent.WaitAny(taskFinishEvents);
if (runningTasks[index].Error == null)
{
resumableContext.Dump();
}
else
{
e = runningTasks[index].Error;
}
runningTasks[index].Finished.Close();
preReadParam.ReturnBuffer(runningTasks[index].InputStream as MemoryStream);
UploadTask task = preReadParam.TakeTask();
if (task == null)
{
waitingCount++;
if (preReadParam.PreReadError != null) // no more task will be created;
{
break;
}
continue;
}
StartUploadPartTask(task);
runningTasks[index] = task;
taskFinishEvents[index] = runningTasks[index].Finished;
nextPart++;
}
if (waitingCount >= MaxWaitingCount)
{
e = new ClientException("Fail to read the data from local stream");
}
WaitHandle.WaitAll(taskFinishEvents);
allTaskDone = true;
}
finally
{
preReadParam.RequestStopPreRead = true;
if (!allTaskDone)
{
WaitHandle.WaitAll(taskFinishEvents);
}
if (uploadProgressCallback != null)
{
long latestUploadedBytes = resumableContext.GetUploadedBytes();
long lastIncrementalUploadedBytes = latestUploadedBytes - _uploadedBytes + _incrementalUploadedBytes;
if (lastIncrementalUploadedBytes > 0)
{
StreamTransferProgressArgs progress = new StreamTransferProgressArgs(lastIncrementalUploadedBytes, latestUploadedBytes, fs.Length);
uploadProgressCallback.Invoke(this, progress);
}
_uploadedBytes = latestUploadedBytes;
}
for (int i = 0; i < parallel; i++)
{
if (runningTasks[i].Error != null)
{
e = runningTasks[i].Error;
}
runningTasks[i].Finished.Close();
}
if (preReadParam.PreReadError != null)
{
e = preReadParam.PreReadError;
}
MemoryStream buffer = null;
while (preReadParam.GetBufferLength() != 0 && (buffer = preReadParam.TakeBuffer()) != null)
{
buffer.Dispose();
}
resumableContext.Dump();
if (e != null)
{
throw e;
}
}
}