sdk/Common/Communication/RetryableServiceClient.cs (154 lines of code) (raw):

/* * Copyright (C) Alibaba Cloud Computing * All rights reserved. * */ using System; using System.Net; using System.Threading; using System.IO; using Aliyun.OSS.Util; namespace Aliyun.OSS.Common.Communication { /// <summary> /// Implementation of <see cref="IServiceClient"/> that will auto-retry HTTP requests /// when encountering some specific exceptions or failures. /// </summary> internal class RetryableServiceClient : IServiceClient { #region Fields & Properties private const int DefaultMaxRetryTimes = 3; private const int DefaultRetryPauseScale = 300; // milliseconds. private readonly IServiceClient _innerClient; public OssFunc<Exception, bool> ShouldRetryCallback { get; set; } public int MaxRetryTimes { get; set; } #endregion #region Constructors public RetryableServiceClient(IServiceClient innerClient) { _innerClient = innerClient; MaxRetryTimes = DefaultMaxRetryTimes; } #endregion #region IServiceClient Members internal IServiceClient InnerServiceClient() { return _innerClient; } public ServiceResponse Send(ServiceRequest request, ExecutionContext context) { return SendImpl(request, context, 0); } private ServiceResponse SendImpl(ServiceRequest request, ExecutionContext context, int retryTimes) { long originalContentPosition = -1; try { if (request.Content != null && request.Content.CanSeek) originalContentPosition = request.Content.Position; return _innerClient.Send(request, context); } catch (Exception ex) { if (ShouldRetry(request, ex, retryTimes)) { if (request.Content != null && (originalContentPosition >= 0 && request.Content.CanSeek)) request.Content.Seek(originalContentPosition, SeekOrigin.Begin); Pause(retryTimes); return SendImpl(request, context, ++retryTimes); } // Rethrow throw; } } public IAsyncResult BeginSend(ServiceRequest request, ExecutionContext context, AsyncCallback callback, object state) { var asyncResult = new RetryableAsyncResult(callback, state, request, context); BeginSendImpl(request, context, asyncResult); return asyncResult; } private void BeginSendImpl(ServiceRequest request, ExecutionContext context, RetryableAsyncResult asyncResult) { if (asyncResult.InnerAsyncResult != null) asyncResult.InnerAsyncResult.Dispose(); asyncResult.InnerAsyncResult = _innerClient.BeginSend(request, context, asyncResult.Callback, asyncResult) as AsyncResult; } public ServiceResponse EndSend(IAsyncResult ar) { if (ar == null) throw new ArgumentNullException("ar"); var asyncResult = ar as AsyncResult<ServiceResponse>; RetryableAsyncResult retryableAsyncResult = ar.AsyncState as RetryableAsyncResult; if (asyncResult == null || retryableAsyncResult == null) throw new InvalidOperationException("Invalid asynchronous invocation status."); try { var response = asyncResult.GetResult(); return response; } catch (Exception ex) { if (retryableAsyncResult.OriginalContentPosition >= 0) { retryableAsyncResult.Request.Content.Seek(retryableAsyncResult.OriginalContentPosition, SeekOrigin.Begin); } if (ShouldRetry(retryableAsyncResult.Request, ex, retryableAsyncResult.Retries)) { Pause(retryableAsyncResult.Retries++); BeginSendImpl(retryableAsyncResult.Request, retryableAsyncResult.Context, retryableAsyncResult); } // Rethrow throw; } finally { asyncResult.Dispose(); } } private bool ShouldRetry(ServiceRequest request, Exception ex, int retryTimes) { if (retryTimes > MaxRetryTimes || !request.IsRepeatable) return false; var webException = ex as WebException; if (webException != null) { var httpWebResponse = webException.Response as HttpWebResponse; if (httpWebResponse != null && (httpWebResponse.StatusCode == HttpStatusCode.ServiceUnavailable || httpWebResponse.StatusCode == HttpStatusCode.InternalServerError)) { return true; } } if (ShouldRetryCallback != null && ShouldRetryCallback(ex)) return true; return false; } private static void Pause(int retryTimes) { // make the pause time increase exponentially based on an assumption // that the more times it retries, the less probability it succeeds. var delay = (int)Math.Pow(2, retryTimes) * DefaultRetryPauseScale; Thread.Sleep(delay); } #endregion } internal class RetryableAsyncResult : AsyncResult<ServiceResponse> { public ServiceRequest Request { get; private set; } public ExecutionContext Context { get; private set; } public AsyncResult InnerAsyncResult { get; set; } public int Retries { get; set; } public long OriginalContentPosition { get; private set; } public RetryableAsyncResult(AsyncCallback callback, object state, ServiceRequest request, ExecutionContext context) : base(callback, state) { Request = request; Context = context; OriginalContentPosition = (request.Content != null && request.Content.CanSeek) ? request.Content.Position : -1; } protected override void Dispose(bool disposing) { base.Dispose(disposing); if (disposing && InnerAsyncResult != null) { InnerAsyncResult.Dispose(); InnerAsyncResult = null; } } } }