in src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs [78:209]
public TResponse Request<TResponse>(Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData)
where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(false, endpoint, boundConfiguration, postData).EnsureCompleted();
/// <inheritdoc cref="IRequestInvoker.RequestAsync{TResponse}" />
public Task<TResponse> RequestAsync<TResponse>(Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData, CancellationToken cancellationToken)
where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(true, endpoint, boundConfiguration, postData, cancellationToken).AsTask();
private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, Endpoint endpoint, BoundConfiguration boundConfiguration, PostData? postData, CancellationToken cancellationToken = default)
where TResponse : TransportResponse, new()
{
var client = GetClient(boundConfiguration);
HttpResponseMessage responseMessage;
int? statusCode = null;
Stream responseStream = null;
Exception ex = null;
string contentType = null;
long contentLength = -1;
IDisposable receivedResponse = DiagnosticSources.SingletonDisposable;
ReadOnlyDictionary<TcpState, int> tcpStats = null;
ReadOnlyDictionary<string, ThreadPoolStatistics> threadPoolStats = null;
Dictionary<string, IEnumerable<string>> responseHeaders = null;
var beforeTicks = Stopwatch.GetTimestamp();
try
{
var requestMessage = CreateHttpRequestMessage(endpoint, boundConfiguration, isAsync);
if (postData is not null)
{
if (isAsync)
await SetContentAsync(requestMessage, boundConfiguration, postData, cancellationToken).ConfigureAwait(false);
else
SetContent(requestMessage, boundConfiguration, postData);
}
using (requestMessage?.Content ?? (IDisposable)Stream.Null)
{
if (boundConfiguration.EnableTcpStats)
tcpStats = TcpStats.GetStates();
if (boundConfiguration.EnableThreadPoolStats)
threadPoolStats = ThreadPoolStats.GetStats();
var prepareRequestMs = (Stopwatch.GetTimestamp() - beforeTicks) / (Stopwatch.Frequency / 1000);
if (prepareRequestMs > OpenTelemetry.MinimumMillisecondsToEmitTimingSpanAttribute && OpenTelemetry.CurrentSpanIsElasticTransportOwnedHasListenersAndAllDataRequested)
Activity.Current?.SetTag(OpenTelemetryAttributes.ElasticTransportPrepareRequestMs, prepareRequestMs);
if (isAsync)
responseMessage = await client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken)
.ConfigureAwait(false);
else
#if NET6_0_OR_GREATER
responseMessage = client.Send(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
#else
responseMessage = client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).GetAwaiter().GetResult();
#endif
receivedResponse = responseMessage;
statusCode = (int)responseMessage.StatusCode;
}
contentType = responseMessage.Content.Headers.ContentType?.ToString();
responseHeaders = ParseHeaders(boundConfiguration, responseMessage);
if (responseMessage.Content != null)
{
if (isAsync)
#if NET6_0_OR_GREATER
responseStream = await responseMessage.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false);
#else
responseStream = await responseMessage.Content.ReadAsStreamAsync().ConfigureAwait(false);
#endif
else
#if NET6_0_OR_GREATER
responseStream = responseMessage.Content.ReadAsStream(cancellationToken);
#else
responseStream = responseMessage.Content.ReadAsStreamAsync().GetAwaiter().GetResult();
#endif
}
// We often won't have the content length as most responses are GZip compressed and the HttpContent ditches this when AutomaticDecompression is enabled.
contentLength = responseMessage.Content.Headers.ContentLength ?? -1;
}
catch (TaskCanceledException e)
{
ex = e;
}
catch (HttpRequestException e)
{
ex = e;
}
TResponse response;
try
{
if (isAsync)
response = await ResponseFactory.CreateAsync<TResponse>
(endpoint, boundConfiguration, postData, ex, statusCode, responseHeaders, responseStream, contentType, contentLength, threadPoolStats, tcpStats, cancellationToken)
.ConfigureAwait(false);
else
response = ResponseFactory.Create<TResponse>
(endpoint, boundConfiguration, postData, ex, statusCode, responseHeaders, responseStream, contentType, contentLength, threadPoolStats, tcpStats);
// Unless indicated otherwise by the TransportResponse, we've now handled the response stream, so we can dispose of the HttpResponseMessage
// to release the connection. In cases, where the derived response works directly on the stream, it can be left open and additional IDisposable
// resources can be linked such that their disposal is deferred.
if (response.LeaveOpen)
{
response.LinkedDisposables = [receivedResponse, responseStream];
}
else
{
responseStream?.Dispose();
receivedResponse?.Dispose();
}
RequestInvokerHelpers.SetOtelAttributes(boundConfiguration, response);
return response;
}
catch
{
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
responseStream?.Dispose();
receivedResponse?.Dispose();
throw;
}
}