tools/code/common/Http.cs (388 lines of code) (raw):

using Azure; using Azure.Core; using Azure.Core.Pipeline; using LanguageExt; using LanguageExt.UnsafeValueAccess; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Runtime.CompilerServices; using System.Text.Json; using System.Text.Json.Nodes; using System.Threading; using System.Threading.Tasks; namespace common; public static class HttpPipelineExtensions { public static async ValueTask<BinaryData> GetContent(this HttpPipeline pipeline, Uri uri, CancellationToken cancellationToken) { var either = await pipeline.TryGetContent(uri, cancellationToken); return either.IfLeftThrow(uri); } /// <summary> /// Gets the response content. If the status code is <see cref="HttpStatusCode.NotFound"/>, returns <see cref="Option.None"/>. /// </summary> public static async ValueTask<Option<BinaryData>> GetContentOption(this HttpPipeline pipeline, Uri uri, CancellationToken cancellationToken) { var either = await pipeline.TryGetContent(uri, cancellationToken); return either.Match(response => { using (response) { return response.Status == (int)HttpStatusCode.NotFound ? Option<BinaryData>.None : throw response.ToHttpRequestException(uri); } }, Option<BinaryData>.Some); } public static async ValueTask<Either<Response, BinaryData>> TryGetContent(this HttpPipeline pipeline, Uri uri, CancellationToken cancellationToken) { using var request = pipeline.CreateRequest(uri, RequestMethod.Get); var response = await pipeline.SendRequestAsync(request, cancellationToken); if (response.IsError) { return response; } else { using (response) { return response.Content; } } } public static HttpRequestException ToHttpRequestException(this Response response, Uri requestUri) => new(message: $"HTTP request to URI {requestUri} failed with status code {response.Status}. Content is '{response.Content}'.", inner: null, statusCode: (HttpStatusCode)response.Status); private static T IfLeftThrow<T>(this Either<Response, T> either, Uri requestUri) => either.IfLeft(response => { using (response) { throw response.ToHttpRequestException(requestUri); } }); public static async IAsyncEnumerable<JsonObject> ListJsonObjects(this HttpPipeline pipeline, Uri uri, [EnumeratorCancellation] CancellationToken cancellationToken) { Uri? nextLink = uri; while (nextLink is not null) { var responseJson = await pipeline.GetJsonObject(nextLink, cancellationToken); var values = responseJson.TryGetJsonArrayProperty("value") .IfLeft(() => []) .PickJsonObjects(); foreach (var value in values) { yield return value; } nextLink = responseJson.TryGetAbsoluteUriProperty("nextLink") .ValueUnsafe(); } } public static async ValueTask<JsonObject> GetJsonObject(this HttpPipeline pipeline, Uri uri, CancellationToken cancellationToken) { var either = await pipeline.TryGetJsonObject(uri, cancellationToken); return either.IfLeftThrow(uri); } /// <summary> /// Gets the response content as a JSON object. If the status code is <see cref="HttpStatusCode.NotFound"/>, returns <see cref="Option.None"/>. /// </summary> public static async ValueTask<Option<JsonObject>> GetJsonObjectOption(this HttpPipeline pipeline, Uri uri, CancellationToken cancellationToken) { var option = await pipeline.GetContentOption(uri, cancellationToken); return option.Map(content => content.ToObjectFromJson<JsonObject>()); } public static async ValueTask<Either<Response, JsonObject>> TryGetJsonObject(this HttpPipeline pipeline, Uri uri, CancellationToken cancellationToken) { var either = await pipeline.TryGetContent(uri, cancellationToken); return either.Map(content => content.ToObjectFromJson<JsonObject>()); } public static async ValueTask DeleteResource(this HttpPipeline pipeline, Uri uri, bool waitForCompletion, CancellationToken cancellationToken) { var either = await pipeline.TryDeleteResource(uri, waitForCompletion, cancellationToken); either.IfLeftThrow(uri); } public static async ValueTask<Either<Response, Unit>> TryDeleteResource(this HttpPipeline pipeline, Uri uri, bool waitForCompletion, CancellationToken cancellationToken) { using var request = pipeline.CreateRequest(uri, RequestMethod.Delete); var response = await pipeline.SendRequestAsync(request, cancellationToken); if (response.IsError) { return response; }; using (response) { if (waitForCompletion) { var operationResponse = await pipeline.WaitForLongRunningOperation(response, cancellationToken); if (operationResponse.IsError) { return operationResponse; } else { using (operationResponse) { return Unit.Default; } } } else { return Unit.Default; } } } public static async ValueTask PutContent(this HttpPipeline pipeline, Uri uri, BinaryData content, CancellationToken cancellationToken) { var either = await pipeline.TryPutContent(uri, content, cancellationToken); #pragma warning disable CA1806 // Do not ignore method results either.IfLeft(response => throw response.ToHttpRequestException(uri)); #pragma warning restore CA1806 // Do not ignore method results } public static async ValueTask<Either<Response, Unit>> TryPutContent(this HttpPipeline pipeline, Uri uri, BinaryData content, CancellationToken cancellationToken) { using var request = pipeline.CreateRequest(uri, RequestMethod.Put); request.Content = RequestContent.Create(content); request.Headers.Add("Content-type", "application/json"); var response = await pipeline.SendRequestAsync(request, cancellationToken); if (response.IsError) { return response; }; using (response) { var operationResponse = await pipeline.WaitForLongRunningOperation(response, cancellationToken); if (operationResponse.IsError) { return operationResponse; } else { using (operationResponse) { return Unit.Default; } } } } public static async ValueTask PatchContent(this HttpPipeline pipeline, Uri uri, BinaryData content, CancellationToken cancellationToken) { var either = await pipeline.TryPatchContent(uri, content, cancellationToken); #pragma warning disable CA1806 // Do not ignore method results either.IfLeft(response => throw response.ToHttpRequestException(uri)); #pragma warning restore CA1806 // Do not ignore method results } public static async ValueTask<Either<Response, Unit>> TryPatchContent(this HttpPipeline pipeline, Uri uri, BinaryData content, CancellationToken cancellationToken) { using var request = pipeline.CreateRequest(uri, RequestMethod.Patch); request.Content = RequestContent.Create(content); request.Headers.Add("Content-type", "application/json"); var response = await pipeline.SendRequestAsync(request, cancellationToken); if (response.IsError) { return response; }; using (response) { var operationResponse = await pipeline.WaitForLongRunningOperation(response, cancellationToken); if (operationResponse.IsError) { return operationResponse; } else { using (operationResponse) { return Unit.Default; } } } } public static Request CreateRequest(this HttpPipeline pipeline, Uri uri, RequestMethod requestMethod) { var request = pipeline.CreateRequest(); request.Uri.Reset(uri); request.Method = requestMethod; return request; } private static async ValueTask<Response> WaitForLongRunningOperation(this HttpPipeline pipeline, Response response, CancellationToken cancellationToken) { var updatedResponse = response; while (((updatedResponse.Status is (int)HttpStatusCode.OK or (int)HttpStatusCode.Created && IsProvisioningInProgress(updatedResponse)) || updatedResponse.Status == (int)HttpStatusCode.Accepted) && updatedResponse.Headers.TryGetValue("Location", out var locationHeaderValue) && Uri.TryCreate(locationHeaderValue, UriKind.Absolute, out var locationUri) && locationUri is not null) { if (updatedResponse.Headers.TryGetValue("Retry-After", out var retryAfterString) && int.TryParse(retryAfterString, out var retryAfterSeconds)) { var retryAfterDuration = TimeSpan.FromSeconds(retryAfterSeconds); await Task.Delay(retryAfterDuration, cancellationToken); } else { await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); } using var request = pipeline.CreateRequest(locationUri, RequestMethod.Get); updatedResponse = await pipeline.SendRequestAsync(request, cancellationToken); if (updatedResponse.IsError) { throw updatedResponse.ToHttpRequestException(locationUri); } } return updatedResponse; } private static bool IsProvisioningInProgress(Response response) { try { return response.Content.ToObjectFromJson<JsonObject>() .TryGetJsonObjectProperty("properties") .Bind(json => json.TryGetStringProperty("ProvisioningState")) .ToOption() .Where(state => state.Equals("InProgress", StringComparison.OrdinalIgnoreCase)) .IsSome; } catch (JsonException) { return false; } } } public sealed class ILoggerHttpPipelinePolicy(ILogger logger) : HttpPipelinePolicy { public override void Process(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline) { ProcessAsync(message, pipeline).AsTask().GetAwaiter().GetResult(); } public override async ValueTask ProcessAsync(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline) { if (logger.IsEnabled(LogLevel.Trace)) { logger.LogTrace(""" Starting request Method: {HttpMethod} Uri: {Uri} Content: {RequestContent} """, message.Request.Method, message.Request.Uri, await GetRequestContent(message, message.CancellationToken)); } else if (logger.IsEnabled(LogLevel.Debug)) { logger.LogDebug(""" Starting request Method: {HttpMethod} Uri: {Uri} """, message.Request.Method, message.Request.Uri); } var startTime = Stopwatch.GetTimestamp(); await ProcessNextAsync(message, pipeline); var endTime = Stopwatch.GetTimestamp(); var duration = TimeSpan.FromSeconds((endTime - startTime) / (double)Stopwatch.Frequency); if (logger.IsEnabled(LogLevel.Trace)) { logger.LogTrace(""" Received response Method: {HttpMethod} Uri: {Uri} Status code: {StatusCode} Duration (hh:mm:ss): {Duration} Content: {ResponseContent} """, message.Request.Method, message.Request.Uri, message.Response.Status, duration.ToString("c"), GetResponseContent(message)); } else if (logger.IsEnabled(LogLevel.Debug)) { logger.LogDebug(""" Received response Method: {HttpMethod} Uri: {Uri} Status code: {StatusCode} Duration (hh:mm:ss): {Duration} """, message.Request.Method, message.Request.Uri, message.Response.Status, duration.ToString("c")); } } private static async ValueTask<string> GetRequestContent(HttpMessage message, CancellationToken cancellationToken) { if (message.Request.Content is null) { return "<null>"; } else if (HeaderIsJson(message.Request.Headers)) { using var stream = new MemoryStream(); await message.Request.Content.WriteToAsync(stream, cancellationToken); stream.Position = 0; var data = await BinaryData.FromStreamAsync(stream, cancellationToken); return data.ToString(); } else { return "<non-json>"; } } private static bool HeaderIsJson(IEnumerable<HttpHeader> headers) => headers.Any(header => header.Name.Equals("Content-Type", StringComparison.OrdinalIgnoreCase) && header.Value.Contains("application/json", StringComparison.OrdinalIgnoreCase)); private static string GetResponseContent(HttpMessage message) => message.Response.Content is null ? "<null>" : HeaderIsJson(message.Response.Headers) ? message.Response.Content.ToString() : "<non-json>"; } public class CommonRetryPolicy : RetryPolicy { protected override bool ShouldRetry(HttpMessage message, Exception? exception) => base.ShouldRetry(message, exception) || ShouldRetryInner(message, exception); protected override async ValueTask<bool> ShouldRetryAsync(HttpMessage message, Exception? exception) => await base.ShouldRetryAsync(message, exception) || ShouldRetryInner(message, exception); private static bool ShouldRetryInner(HttpMessage message, Exception? exception) { try { return (message, exception) switch { ({ Response.Status: 422 or 409 }, _) when HasManagementApiRequestFailedError(message.Response) => true, ({ Response.Status: 412 }, _) => true, ({ Response.Status: 429 }, _) => true, _ => false }; } catch (InvalidOperationException) { return false; } } private static bool HasManagementApiRequestFailedError(Response response) => TryGetErrorCode(response) .Where(code => code.Equals("ManagementApiRequestFailed", StringComparison.OrdinalIgnoreCase)) .IsSome; private static Option<string> TryGetErrorCode(Response response) { try { return response.Content .ToObjectFromJson<JsonObject>() .TryGetJsonObjectProperty("error") .Bind(error => error.TryGetStringProperty("code")) .ToOption(); } catch (Exception exception) when (exception is ArgumentNullException or NotSupportedException or JsonException) { return Option<string>.None; } } } public class TelemetryPolicy(Version version) : HttpPipelinePolicy { public override void Process(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline) { ProcessAsync(message, pipeline).AsTask().GetAwaiter().GetResult(); } public override async ValueTask ProcessAsync(HttpMessage message, ReadOnlyMemory<HttpPipelinePolicy> pipeline) { var header = new ProductHeaderValue("apimanagement-apiops", version.ToString()); message.Request.Headers.Add(HttpHeader.Names.UserAgent, header.ToString()); await ProcessNextAsync(message, pipeline); } }