in src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs [60:208]
public TResponse Request<TResponse>(Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData)
where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(false, endpoint, boundConfiguration, postData).EnsureCompleted();
/// <inheritdoc cref="IRequestInvoker.RequestAsync{TResponse}"/>>
public Task<TResponse> RequestAsync<TResponse>(Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(true, endpoint, boundConfiguration, postData, cancellationToken).AsTask();
private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new()
{
Action unregisterWaitHandle = null;
int? statusCode = null;
Stream responseStream = null;
Exception ex = null;
string contentType = null;
long contentLength = -1;
IDisposable receivedResponse = DiagnosticSources.SingletonDisposable;
ReadOnlyDictionary<TcpState, int> tcpStats = null;
ReadOnlyDictionary<string, ThreadPoolStatistics> threadPoolStats = null;
Dictionary<string, IEnumerable<string>> responseHeaders = null;
var beforeTicks = Stopwatch.GetTimestamp();
try
{
var data = postData;
var request = CreateHttpWebRequest(endpoint, boundConfiguration, postData, isAsync);
using (cancellationToken.Register(() => request.Abort()))
{
if (data is not null)
{
if (isAsync)
{
var apmGetRequestStreamTask =
Task.Factory.FromAsync(request.BeginGetRequestStream, request.EndGetRequestStream, null);
unregisterWaitHandle = RegisterApmTaskTimeout(apmGetRequestStreamTask, request, boundConfiguration);
using (var stream = await apmGetRequestStreamTask.ConfigureAwait(false))
{
if (boundConfiguration.HttpCompression)
{
using var zipStream = new GZipStream(stream, CompressionMode.Compress);
await data.WriteAsync(zipStream, boundConfiguration.ConnectionSettings, boundConfiguration.DisableDirectStreaming, cancellationToken).ConfigureAwait(false);
}
else
await data.WriteAsync(stream, boundConfiguration.ConnectionSettings, boundConfiguration.DisableDirectStreaming, cancellationToken).ConfigureAwait(false);
}
unregisterWaitHandle?.Invoke();
}
else
{
using var stream = request.GetRequestStream();
if (boundConfiguration.HttpCompression)
{
using var zipStream = new GZipStream(stream, CompressionMode.Compress);
data.Write(zipStream, boundConfiguration.ConnectionSettings, boundConfiguration.DisableDirectStreaming);
}
else
data.Write(stream, boundConfiguration.ConnectionSettings, boundConfiguration.DisableDirectStreaming);
}
}
var prepareRequestMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (prepareRequestMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportPrepareRequestMs, prepareRequestMs);
//http://msdn.microsoft.com/en-us/library/system.net.httpwebresponse.getresponsestream.aspx
//Either the stream or the response object needs to be closed but not both although it won't
//throw any errors if both are closed atleast one of them has to be Closed.
//Since we expose the stream we let closing the stream determining when to close the connection
if (boundConfiguration.EnableTcpStats)
tcpStats = TcpStats.GetStates();
if (boundConfiguration.EnableThreadPoolStats)
threadPoolStats = ThreadPoolStats.GetStats();
HttpWebResponse httpWebResponse;
if (isAsync)
{
var apmGetResponseTask = Task.Factory.FromAsync(request.BeginGetResponse, r => request.EndGetResponse(r), null);
unregisterWaitHandle = RegisterApmTaskTimeout(apmGetResponseTask, request, boundConfiguration);
httpWebResponse = (HttpWebResponse)await apmGetResponseTask.ConfigureAwait(false);
}
else
{
httpWebResponse = (HttpWebResponse)request.GetResponse();
}
receivedResponse = httpWebResponse;
HandleResponse(httpWebResponse, out statusCode, out responseStream, out contentType);
responseHeaders = ParseHeaders(boundConfiguration, httpWebResponse, responseHeaders);
contentLength = httpWebResponse.ContentLength;
}
}
catch (WebException e)
{
ex = e;
if (e.Response is HttpWebResponse httpWebResponse)
HandleResponse(httpWebResponse, out statusCode, out responseStream, out contentType);
}
finally
{
unregisterWaitHandle?.Invoke();
}
try
{
TResponse response;
if (isAsync)
response = await ResponseFactory.CreateAsync<TResponse>
(endpoint, boundConfiguration, postData, ex, statusCode, responseHeaders, responseStream, contentType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = ResponseFactory.Create<TResponse>
(endpoint, boundConfiguration, postData, ex, statusCode, responseHeaders, responseStream, contentType, contentLength, threadPoolStats, tcpStats);
// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection. In cases, where the derived response works directly on the stream, it can be left open and additional IDisposable
// resources can be linked such that their disposal is deferred.
if (response.LeaveOpen)
{
response.LinkedDisposables = [receivedResponse, responseStream];
}
else
{
responseStream?.Dispose();
receivedResponse?.Dispose();
}
RequestInvokerHelpers.SetOtelAttributes(boundConfiguration, response);
return response;
}
catch
{
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
responseStream?.Dispose();
receivedResponse?.Dispose();
throw;
}
}