in src/Elastic.Clients.Elasticsearch/_Shared/Client/ElasticsearchClient.cs [99:246]
internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(TRequest request)
where TRequest : Request<TRequestParameters>
where TResponse : TransportResponse, new()
where TRequestParameters : RequestParameters, new() =>
DoRequest<TRequest, TResponse, TRequestParameters>(request, null);
internal TResponse DoRequest<TRequest, TResponse, TRequestParameters>(
TRequest request,
Action<IRequestConfiguration>? forceConfiguration)
where TRequest : Request<TRequestParameters>
where TResponse : TransportResponse, new()
where TRequestParameters : RequestParameters, new()
=> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(false, request, forceConfiguration).EnsureCompleted();
internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
TRequest request,
CancellationToken cancellationToken = default)
where TRequest : Request<TRequestParameters>
where TResponse : TransportResponse, new()
where TRequestParameters : RequestParameters, new()
=> DoRequestAsync<TRequest, TResponse, TRequestParameters>(request, null, cancellationToken);
internal Task<TResponse> DoRequestAsync<TRequest, TResponse, TRequestParameters>(
TRequest request,
Action<IRequestConfiguration>? forceConfiguration,
CancellationToken cancellationToken = default)
where TRequest : Request<TRequestParameters>
where TResponse : TransportResponse, new()
where TRequestParameters : RequestParameters, new()
=> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(true, request, forceConfiguration, cancellationToken).AsTask();
private ValueTask<TResponse> DoRequestCoreAsync<TRequest, TResponse, TRequestParameters>(
bool isAsync,
TRequest request,
Action<IRequestConfiguration>? forceConfiguration,
CancellationToken cancellationToken = default)
where TRequest : Request<TRequestParameters>
where TResponse : TransportResponse, new()
where TRequestParameters : RequestParameters, new()
{
// The product check modifies request parameters and therefore must not be executed concurrently.
// We use a lockless CAS approach to make sure that only a single product check request is executed at a time.
// We do not guarantee that the product check is always performed on the first request.
var productCheckStatus = Interlocked.CompareExchange(
ref _productCheckStatus,
(int)ProductCheckStatus.InProgress,
(int)ProductCheckStatus.NotChecked
);
return productCheckStatus switch
{
(int)ProductCheckStatus.NotChecked => SendRequestWithProductCheck(),
(int)ProductCheckStatus.InProgress or
(int)ProductCheckStatus.Succeeded => SendRequest(),
(int)ProductCheckStatus.Failed => throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError),
_ => throw new InvalidOperationException("unreachable")
};
ValueTask<TResponse> SendRequest()
{
var (endpointPath, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request);
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, resolvedRouteValues);
return isAsync
? new ValueTask<TResponse>(_transport.RequestAsync<TResponse>(endpointPath, postData, openTelemetryDataMutator, request.RequestConfiguration, cancellationToken))
: new ValueTask<TResponse>(_transport.Request<TResponse>(endpointPath, postData, openTelemetryDataMutator, request.RequestConfiguration));
}
async ValueTask<TResponse> SendRequestWithProductCheck()
{
try
{
return await SendRequestWithProductCheckCore().ConfigureAwait(false);
}
catch
{
// Re-try product check on next request.
// 32-bit read/write operations are atomic and due to the initial memory barrier, we can be sure that
// no other thread executes the product check at the same time. Locked access is not required here.
if (_productCheckStatus is (int)ProductCheckStatus.InProgress)
_productCheckStatus = (int)ProductCheckStatus.NotChecked;
throw;
}
}
async ValueTask<TResponse> SendRequestWithProductCheckCore()
{
// Attach product check header
// TODO: The copy constructor should accept null values
var requestConfig = (request.RequestConfiguration is null)
? new RequestConfiguration()
{
ResponseHeadersToParse = new HeadersList("x-elastic-product")
}
: new RequestConfiguration(request.RequestConfiguration)
{
ResponseHeadersToParse = (request.RequestConfiguration.ResponseHeadersToParse is { Count: > 0 })
? new HeadersList(request.RequestConfiguration.ResponseHeadersToParse, "x-elastic-product")
: new HeadersList("x-elastic-product")
};
// Send request
var (endpointPath, resolvedRouteValues, postData) = PrepareRequest<TRequest, TRequestParameters>(request);
var openTelemetryDataMutator = GetOpenTelemetryDataMutator<TRequest, TRequestParameters>(request, resolvedRouteValues);
TResponse response;
if (isAsync)
{
response = await _transport
.RequestAsync<TResponse>(endpointPath, postData, openTelemetryDataMutator, requestConfig, cancellationToken)
.ConfigureAwait(false);
}
else
{
response = _transport.Request<TResponse>(endpointPath, postData, openTelemetryDataMutator, requestConfig);
}
// Evaluate product check result
var hasSuccessStatusCode = response.ApiCallDetails.HttpStatusCode is >= 200 and <= 299;
if (!hasSuccessStatusCode)
{
// The product check is unreliable for non success status codes.
// We have to re-try on the next request.
_productCheckStatus = (int)ProductCheckStatus.NotChecked;
return response;
}
var productCheckSucceeded = response.ApiCallDetails.TryGetHeader("x-elastic-product", out var values) &&
values.FirstOrDefault(x => x.Equals("Elasticsearch", StringComparison.Ordinal)) is not null;
_productCheckStatus = productCheckSucceeded
? (int)ProductCheckStatus.Succeeded
: (int)ProductCheckStatus.Failed;
if (_productCheckStatus == (int)ProductCheckStatus.Failed)
throw new UnsupportedProductException(UnsupportedProductException.InvalidProductError);
return response;
}
}