in src/common/Kubernetes/KubernetesClient.cs [1059:1151]
private async Task<T> ClientInvokeWrapperAsync<T>(Func<Task<T>> handler, string operation, CancellationToken cancellationToken = default(CancellationToken))
{
T result = default(T);
var maxWait = TimeSpan.FromSeconds(30);
var interval = TimeSpan.FromMilliseconds(100);
string method = $"{nameof(KubernetesClient)}.{nameof(ClientInvokeWrapperAsync)}";
var operationProperties = new Dictionary<string, object>(_loggingProperties);
bool success = await WebUtilities.RetryUntilTimeAsync(async t =>
{
try
{
result = await handler();
if (result == null)
{
throw new UnauthorizedAccessException("Kubernetes returned an Unauthorized (401) status code");
}
return true;
}
catch (HttpOperationException opEx) when (opEx.Response != null)
{
if (opEx.Response.StatusCode == HttpStatusCode.NotFound)
{
result = default(T);
return true;
}
_log.Error($"{operation} threw {nameof(HttpOperationException)}: StatusCode='{opEx.Response.StatusCode}', ReasonPhrase='{opEx.Response.ReasonPhrase}', Content='{new PII(opEx.Response.Content)}'");
if (!string.IsNullOrEmpty(opEx.Response.Content))
{
// Try to construct an error message that takes advantage of all the info available to us
var deserializedContent = JsonHelpers.DeserializeObject<Dictionary<string, object>>(opEx.Response.Content);
if (deserializedContent != null && deserializedContent.TryGetValue("message", out object errorMessage))
{
var errorString = $"{opEx.Response.ReasonPhrase}: {new PII(errorMessage.ToString())}";
if (opEx.Response.StatusCode == HttpStatusCode.Forbidden)
{
// Include aka.ms link if this is an RBAC error
errorString = string.Format(CommonResources.IncludeRBACInformationFormat, Constants.Product.Name, "https://aka.ms/bridge-to-k8s-rbac", errorString);
}
throw new UserVisibleException(_log.OperationContext, errorString);
}
}
throw;
}
catch (HttpRequestException httpEx) when (httpEx.Message.Contains("No such host is known"))
{
this._log.Warning(httpEx.Message);
throw new InvalidUsageException(this._log.OperationContext, CommonResources.NoKubernetesClusterFound, httpEx);
}
catch (HttpRequestException httpEx) when (httpEx.GetInnermostException() is IOException)
{
// Retryable error
var error = $"{method} encountered retryable {nameof(HttpRequestException)}: {httpEx.Message}: {httpEx.GetInnermostException().Message}: {t.ToString()}";
operationProperties[Property.Error] = error;
this._log.Warning(error);
}
catch (HttpRequestException httpEx) when (httpEx.GetInnermostException() is SocketException)
{
// Retryable error
var error = $"{method} encountered retryable {nameof(HttpRequestException)}: {httpEx.Message}: {httpEx.GetInnermostException().Message}: {t.ToString()}";
operationProperties[Property.Error] = error;
this._log.Warning(error);
}
catch (OperationCanceledException operationCanceledEx) when (operationCanceledEx.GetInnermostException() is IOException)
{
// Retryable error
var error = $"{method} encountered retryable {nameof(OperationCanceledException)}: Error: {operationCanceledEx.ToString()}: {t.ToString()}";
operationProperties[Property.Error] = error;
this._log.Warning(error);
}
catch (IOException ioex)
{
// Retryable error
var error = $"{method} encountered retryable {nameof(IOException)}: {ioex.Message}: {t.ToString()}";
operationProperties[Property.Error] = error;
this._log.Warning(error);
}
await Task.Delay(interval, cancellationToken);
return false;
}, maxWait, cancellationToken);
if (!success)
{
this._log.Dependency(Dependency.Kubernetes, operation, success: false, properties: operationProperties);
throw new TimeoutException($"{method} timed out after {maxWait.TotalSeconds} seconds");
}
this._log.Dependency(Dependency.Kubernetes, operation, success: true, properties: operationProperties);
return result;
}