sdk/Common/ResumableUploadManager.cs (588 lines of code) (raw):
/*
* Copyright (C) Alibaba Cloud Computing
* All rights reserved.
*
*/
using System;
using System.IO;
using System.Net;
using System.Threading;
using System.Collections.Generic;
using Aliyun.OSS.Domain;
using Aliyun.OSS.Commands;
using Aliyun.OSS.Util;
using Aliyun.OSS.Common;
namespace Aliyun.OSS
{
internal class ResumableUploadManager
{
private OssClient _ossClient;
private int _maxRetryTimes;
private ClientConfiguration _conf;
private long _uploadedBytes;
private long _totalBytes;
private EventHandler<StreamTransferProgressArgs> _uploadProgressCallback;
private long _incrementalUploadedBytes;
private object _callbackLock = new object();
private UploadObjectRequest _request;
public ResumableUploadManager(OssClient ossClient, int maxRetryTimes, ClientConfiguration conf)
{
this._ossClient = ossClient;
this._maxRetryTimes = maxRetryTimes;
this._conf = conf;
}
public void ResumableUploadWithRetry(UploadObjectRequest request, ResumableContext resumableContext)
{
_request = request;
using (Stream fs = request.UploadStream ?? new FileStream(request.UploadFile, FileMode.Open, FileAccess.Read, FileShare.Read))
{
for (int i = 0; i < _maxRetryTimes; i++)
{
try
{
DoResumableUpload(request.BucketName, request.Key, resumableContext, fs, request.StreamTransferProgress);
break;
}
catch (Exception ex)
{
if (i != _maxRetryTimes - 1)
{
Thread.Sleep(1000);
continue;
}
else
{
throw ex;
}
}
}
}
}
private void DoResumableUpload(string bucketName, string key, ResumableContext resumableContext, Stream fs,
EventHandler<StreamTransferProgressArgs> uploadProgressCallback)
{
bool isFileStream = fs is FileStream;
// use single thread if MaxResumableUploadThreads is no bigger than 1
// or when the stream is not file stream and the part size is bigger than the conf.MaxPartCachingSize
if (_request.ParallelThreadCount <= 1 || (!isFileStream || _conf.UseSingleThreadReadInResumableUpload) && resumableContext.PartContextList[0].Length > _conf.MaxPartCachingSize)
{
DoResumableUploadSingleThread(bucketName, key, resumableContext, fs, uploadProgressCallback);
}
else if (isFileStream && !_conf.UseSingleThreadReadInResumableUpload)
{
// multi-threaded read file and send the data
DoResumableUploadFileMultiThread(bucketName, key, resumableContext, fs as FileStream, uploadProgressCallback);
}
else
{
// single thread pre-read the data and multi-thread send the data.
DoResumableUploadPreReadMultiThread(bucketName, key, resumableContext, fs, uploadProgressCallback);
}
}
private void DoResumableUploadSingleThread(string bucketName, string key, ResumableContext resumableContext, Stream fs,
EventHandler<StreamTransferProgressArgs> uploadProgressCallback)
{
var uploadedBytes = resumableContext.GetUploadedBytes();
foreach (var part in resumableContext.PartContextList)
{
if (part.IsCompleted)
{
continue;
}
fs.Seek(part.Position, SeekOrigin.Begin);
var originalStream = fs;
if (uploadProgressCallback != null)
{
originalStream = _ossClient.SetupProgressListeners(originalStream,
fs.Length,
uploadedBytes,
_conf.ProgressUpdateInterval,
uploadProgressCallback);
}
var request = new UploadPartRequest(bucketName, key, resumableContext.UploadId)
{
InputStream = originalStream,
PartSize = part.Length,
PartNumber = part.PartId,
RequestPayer = _request.RequestPayer,
TrafficLimit = _request.TrafficLimit
};
var partResult = _ossClient.UploadPart(request);
part.PartETag = partResult.PartETag;
part.IsCompleted = true;
if (partResult.ResponseMetadata.ContainsKey(HttpHeaders.HashCrc64Ecma))
{
part.Crc64 = ulong.Parse(partResult.ResponseMetadata[HttpHeaders.HashCrc64Ecma]);
}
resumableContext.Dump();
uploadedBytes += part.Length;
}
}
private void ProcessCallbackInternal(object sender, StreamTransferProgressArgs e)
{
Interlocked.Add(ref _uploadedBytes, e.IncrementTransferred);
Interlocked.Add(ref _incrementalUploadedBytes, e.IncrementTransferred);
if (_uploadProgressCallback != null && _incrementalUploadedBytes >= _conf.ProgressUpdateInterval)
{
lock (this._callbackLock)
{
if (_incrementalUploadedBytes >= _conf.ProgressUpdateInterval)
{
long incrementalUploadedBytes = _incrementalUploadedBytes;
StreamTransferProgressArgs progress = new StreamTransferProgressArgs(incrementalUploadedBytes, _uploadedBytes, _totalBytes);
_uploadProgressCallback.Invoke(this, progress);
Interlocked.Add(ref _incrementalUploadedBytes, -incrementalUploadedBytes);
}
}
}
}
internal class PreReadThreadParam
{
public Stream Fs;
public ResumableContext ResumableContext;
public Exception PreReadError;
private Queue<UploadTask> Queue = new Queue<UploadTask>();
private Queue<MemoryStream> BufferPool = new Queue<MemoryStream>();
private object _bufferLock = new object();
private object _taskLock = new object();
private ManualResetEvent _taskAvailable = new ManualResetEvent(false);
private ManualResetEvent _bufferAvailable = new ManualResetEvent(false);
public MemoryStream TakeBuffer()
{
if (!_bufferAvailable.WaitOne(1000))
{
return null;
}
lock(_bufferLock)
{
MemoryStream buffer = BufferPool.Dequeue();
if (BufferPool.Count == 0)
{
_bufferAvailable.Reset();
}
return buffer;
}
}
public void ReturnBuffer(MemoryStream ms)
{
lock(_bufferLock)
{
BufferPool.Enqueue(ms);
if (BufferPool.Count == 1)
{
_bufferAvailable.Set();
}
}
}
public UploadTask TakeTask()
{
if (!_taskAvailable.WaitOne(1000 * 10))
{
return null;
}
lock(_taskLock)
{
UploadTask param = Queue.Dequeue();
if (Queue.Count == 0)
{
_taskAvailable.Reset();
}
return param;
}
}
public void CreateTask(UploadTask task)
{
lock(_taskLock)
{
Queue.Enqueue(task);
if (Queue.Count == 1)
{
_taskAvailable.Set();
}
}
}
public int GetTaskLength()
{
return Queue.Count;
}
public int GetBufferLength()
{
return BufferPool.Count;
}
public bool RequestStopPreRead
{
get;
set;
}
}
private void StartPreRead(object state)
{
PreReadThreadParam preReadThreadParam = state as PreReadThreadParam;
if (preReadThreadParam == null)
{
throw new ClientException("Internal error: the state must be type of PreReadThreadParam");
}
int nextPart = 0;
try
{
while (!preReadThreadParam.RequestStopPreRead && nextPart < preReadThreadParam.ResumableContext.PartContextList.Count)
{
MemoryStream buffer = preReadThreadParam.TakeBuffer();
if (buffer == null)
{
continue;
}
UploadTask param = new UploadTask();
param.UploadFileStream = preReadThreadParam.Fs;
param.InputStream = buffer;
param.ResumableUploadContext = preReadThreadParam.ResumableContext;
param.ResumableUploadPartContext = preReadThreadParam.ResumableContext.PartContextList[nextPart++];
param.UploadProgressCallback = _uploadProgressCallback;
param.ProgressUpdateInterval = _conf.ProgressUpdateInterval;
param.Finished = new ManualResetEvent(false);
int readCount = 0;
while (readCount != param.ResumableUploadPartContext.Length)
{
int count = preReadThreadParam.Fs.Read(buffer.GetBuffer(), readCount, (int)param.ResumableUploadPartContext.Length - readCount);
if (count == 0)
{
throw new System.IO.IOException(string.Format("Unable to read data with expected size. Expected size:{0}, actual read size: {1}", param.ResumableUploadPartContext.Length, readCount));
}
readCount += count;
}
param.InputStream.SetLength(readCount);
preReadThreadParam.CreateTask(param);
}
}
catch(Exception e)
{
preReadThreadParam.PreReadError = e;
}
}
private UploadTask CreateTask(int i, ResumableContext resumableContext, FileStream fs)
{
UploadTask param = new UploadTask();
param.UploadFileStream = fs;
param.InputStream = new FileStream(fs.Name, FileMode.Open, FileAccess.Read, FileShare.Read);
param.ResumableUploadContext = resumableContext;
param.ResumableUploadPartContext = resumableContext.PartContextList[i];
param.UploadProgressCallback = _uploadProgressCallback;
param.ProgressUpdateInterval = _conf.ProgressUpdateInterval;
param.Finished = new ManualResetEvent(false);
return param;
}
/// <summary>
/// Do the resumable upload with multithread from file stream.
/// </summary>
/// <param name="bucketName">Bucket name.</param>
/// <param name="key">Key.</param>
/// <param name="resumableContext">Resumable context.</param>
/// <param name="fs">Fs.</param>
/// <param name="uploadProgressCallback">Upload progress callback.</param>
private void DoResumableUploadFileMultiThread(string bucketName, string key, ResumableContext resumableContext, FileStream fs,
EventHandler<StreamTransferProgressArgs> uploadProgressCallback)
{
_uploadProgressCallback = uploadProgressCallback;
_uploadedBytes = resumableContext.GetUploadedBytes();
_totalBytes = fs.Length;
_incrementalUploadedBytes = 0;
Exception e = null;
int parallel = Math.Min(_request.ParallelThreadCount, resumableContext.PartContextList.Count);
ManualResetEvent[] taskFinishEvents = new ManualResetEvent[parallel];
UploadTask[] runningTasks = new UploadTask[parallel];
fs.Seek(0, SeekOrigin.Begin);
bool allTaskDone = false;
for (int i = 0; i < parallel; i++)
{
UploadTask param = CreateTask(i, resumableContext, fs);
taskFinishEvents[i] = param.Finished;
runningTasks[i] = param;
StartUploadPartTask(param);
}
int nextPart = parallel;
try
{
while (nextPart < resumableContext.PartContextList.Count)
{
int index = ManualResetEvent.WaitAny(taskFinishEvents);
if (runningTasks[index].Error == null)
{
resumableContext.Dump();
}
else
{
e = runningTasks[index].Error;
}
runningTasks[index].Finished.Close();
runningTasks[index].InputStream.Dispose();
UploadTask task = CreateTask(nextPart, resumableContext, fs);
StartUploadPartTask(task);
runningTasks[index] = task;
taskFinishEvents[index] = runningTasks[index].Finished;
nextPart++;
}
WaitHandle.WaitAll(taskFinishEvents);
allTaskDone = true;
}
finally
{
if (!allTaskDone)
{
WaitHandle.WaitAll(taskFinishEvents);
}
if (uploadProgressCallback != null)
{
long latestUploadedBytes = resumableContext.GetUploadedBytes();
long lastIncrementalUploadedBytes = latestUploadedBytes - _uploadedBytes + _incrementalUploadedBytes;
if (lastIncrementalUploadedBytes > 0)
{
StreamTransferProgressArgs progress = new StreamTransferProgressArgs(lastIncrementalUploadedBytes, latestUploadedBytes, fs.Length);
uploadProgressCallback.Invoke(this, progress);
}
_uploadedBytes = latestUploadedBytes;
}
for (int i = 0; i < parallel; i++)
{
taskFinishEvents[i].Close();
if (runningTasks[i].Error != null)
{
e = runningTasks[i].Error;
}
runningTasks[i].InputStream.Dispose();
}
resumableContext.Dump();
if (e != null)
{
throw e;
}
}
}
/// <summary>
/// Do the resumable upload with multithread from non file stream
/// </summary>
/// <param name="bucketName">Bucket name.</param>
/// <param name="key">Key.</param>
/// <param name="resumableContext">Resumable context.</param>
/// <param name="fs">Fs.</param>
/// <param name="uploadProgressCallback">Upload progress callback.</param>
private void DoResumableUploadPreReadMultiThread(string bucketName, string key, ResumableContext resumableContext, Stream fs,
EventHandler<StreamTransferProgressArgs> uploadProgressCallback)
{
_uploadProgressCallback = uploadProgressCallback;
_uploadedBytes = resumableContext.GetUploadedBytes();
_totalBytes = fs.Length;
_incrementalUploadedBytes = 0;
Exception e = null;
int parallel = Math.Min(_request.ParallelThreadCount, resumableContext.PartContextList.Count);
int preReadPartCount = Math.Min(parallel, _conf.PreReadBufferCount) + parallel;
ManualResetEvent[] taskFinishEvents = new ManualResetEvent[parallel];
UploadTask[] runningTasks = new UploadTask[parallel];
fs.Seek(0, SeekOrigin.Begin);
// init the buffer pool
PreReadThreadParam preReadParam = new PreReadThreadParam();
preReadParam.Fs = fs;
preReadParam.ResumableContext = resumableContext;
for (int i = 0; i < preReadPartCount && i < resumableContext.PartContextList.Count; i++)
{
var part = resumableContext.PartContextList[i];
preReadParam.ReturnBuffer(new MemoryStream((int)part.Length));
}
Thread thread = new Thread(new ParameterizedThreadStart(StartPreRead));
thread.Start(preReadParam);
bool allTaskDone = false;
for (int i = 0; i < parallel; i++)
{
UploadTask task = preReadParam.TakeTask();
if (task == null)
{
continue;
}
taskFinishEvents[i] = task.Finished;
runningTasks[i] = task;
StartUploadPartTask(task);
}
int nextPart = parallel;
try
{
int waitingCount = 0;
const int MaxWaitingCount = 100;
while (nextPart < resumableContext.PartContextList.Count && waitingCount < MaxWaitingCount)
{
int index = ManualResetEvent.WaitAny(taskFinishEvents);
if (runningTasks[index].Error == null)
{
resumableContext.Dump();
}
else
{
e = runningTasks[index].Error;
}
runningTasks[index].Finished.Close();
preReadParam.ReturnBuffer(runningTasks[index].InputStream as MemoryStream);
UploadTask task = preReadParam.TakeTask();
if (task == null)
{
waitingCount++;
if (preReadParam.PreReadError != null) // no more task will be created;
{
break;
}
continue;
}
StartUploadPartTask(task);
runningTasks[index] = task;
taskFinishEvents[index] = runningTasks[index].Finished;
nextPart++;
}
if (waitingCount >= MaxWaitingCount)
{
e = new ClientException("Fail to read the data from local stream");
}
WaitHandle.WaitAll(taskFinishEvents);
allTaskDone = true;
}
finally
{
preReadParam.RequestStopPreRead = true;
if (!allTaskDone)
{
WaitHandle.WaitAll(taskFinishEvents);
}
if (uploadProgressCallback != null)
{
long latestUploadedBytes = resumableContext.GetUploadedBytes();
long lastIncrementalUploadedBytes = latestUploadedBytes - _uploadedBytes + _incrementalUploadedBytes;
if (lastIncrementalUploadedBytes > 0)
{
StreamTransferProgressArgs progress = new StreamTransferProgressArgs(lastIncrementalUploadedBytes, latestUploadedBytes, fs.Length);
uploadProgressCallback.Invoke(this, progress);
}
_uploadedBytes = latestUploadedBytes;
}
for (int i = 0; i < parallel; i++)
{
if (runningTasks[i].Error != null)
{
e = runningTasks[i].Error;
}
runningTasks[i].Finished.Close();
}
if (preReadParam.PreReadError != null)
{
e = preReadParam.PreReadError;
}
MemoryStream buffer = null;
while (preReadParam.GetBufferLength() != 0 && (buffer = preReadParam.TakeBuffer()) != null)
{
buffer.Dispose();
}
resumableContext.Dump();
if (e != null)
{
throw e;
}
}
}
internal class UploadTask
{
public Stream UploadFileStream
{
get;
set;
}
public Stream InputStream
{
get;
set;
}
public ResumableContext ResumableUploadContext
{
get;
set;
}
public ResumablePartContext ResumableUploadPartContext
{
get;
set;
}
public EventHandler<StreamTransferProgressArgs> UploadProgressCallback
{
get;
set;
}
public long ProgressUpdateInterval
{
get;
set;
}
public ManualResetEvent Finished
{
get;
set;
}
public Exception Error
{
get;
set;
}
}
private void StartUploadPartTask(UploadTask taskParam)
{
ThreadPool.QueueUserWorkItem(UploadPart, taskParam);
}
private void UploadPart(object state)
{
UploadTask taskParam = state as UploadTask;
if (taskParam == null)
{
throw new ClientException("Internal error. The state object should be an instance of class UploadTaskParam");
}
try
{
ResumablePartContext part = taskParam.ResumableUploadPartContext;
if (part.IsCompleted)
{
return;
}
const int retryCount = 3;
Stream stream = taskParam.InputStream;
for (int i = 0; i < retryCount; i++)
{
if (stream is FileStream)
{
stream.Seek(part.Position, SeekOrigin.Begin);
}
else
{
stream.Seek(0, SeekOrigin.Begin);
}
Stream progressCallbackStream = null;
try
{
if (taskParam.UploadProgressCallback != null)
{
progressCallbackStream = _ossClient.SetupProgressListeners(stream,
taskParam.UploadFileStream.Length, // does not matter
_uploadedBytes, // does not matter
Math.Min(taskParam.ProgressUpdateInterval, 1024 * 4),
this.ProcessCallbackInternal);
}
var request = new UploadPartRequest(taskParam.ResumableUploadContext.BucketName, taskParam.ResumableUploadContext.Key, taskParam.ResumableUploadContext.UploadId)
{
InputStream = progressCallbackStream ?? stream,
PartSize = part.Length,
PartNumber = part.PartId,
RequestPayer = _request.RequestPayer,
TrafficLimit = _request.TrafficLimit
};
var partResult = _ossClient.UploadPart(request);
part.PartETag = partResult.PartETag;
if (partResult.ResponseMetadata.ContainsKey(HttpHeaders.HashCrc64Ecma))
{
part.Crc64 = ulong.Parse(partResult.ResponseMetadata[HttpHeaders.HashCrc64Ecma]);
}
part.IsCompleted = true;
break;
}
catch (Exception ex) // when the connection is closed while sending the data, it will run into ObjectDisposedException.
{
if (!(ex is ObjectDisposedException || ex is WebException) || i == retryCount - 1)
{
throw;
}
}
finally
{
if (progressCallbackStream != null)
{
progressCallbackStream.Dispose();
}
}
}
}
catch(Exception e)
{
taskParam.Error = e;
}
finally
{
taskParam.Finished.Set();
}
}
}
}