in src/Elastic.Transport/DistributedTransport.cs [63:274]
public TResponse Request<TResponse>(
in EndpointPath path,
PostData? data,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration
) where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(isAsync: false, path, data, configureActivity, localConfiguration)
.EnsureCompleted();
/// <inheritdoc cref="ITransport.RequestAsync{TResponse}"/>
public Task<TResponse> RequestAsync<TResponse>(
in EndpointPath path,
PostData? data,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration,
CancellationToken cancellationToken = default
) where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(isAsync: true, path, data, configureActivity, localConfiguration, cancellationToken)
.AsTask();
private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
bool isAsync,
EndpointPath path,
PostData? data,
Action<Activity>? configureActivity,
IRequestConfiguration? localConfiguration,
CancellationToken cancellationToken = default
) where TResponse : TransportResponse, new()
{
Activity activity = null;
if (OpenTelemetry.ElasticTransportActivitySource.HasListeners())
activity = OpenTelemetry.ElasticTransportActivitySource.StartActivity(path.Method.GetStringValue(),
ActivityKind.Client);
try
{
// Unless per request configuration is provided, we can reuse a BoundConfiguration
// that is specific to this transport. If the IRequestConfiguration is an instance
// of BoundConfiguration we use that cached instance directly without rebinding.
var boundConfiguration = localConfiguration switch
{
BoundConfiguration bc => bc,
{ } rc => new BoundConfiguration(Configuration, rc),
_ => TransportBoundConfiguration
};
Configuration.OnConfigurationBound?.Invoke(boundConfiguration);
var pipeline = boundConfiguration == TransportBoundConfiguration ? TransportPipeline : Configuration.PipelineProvider.Create(boundConfiguration);
var startedOn = Configuration.DateTimeProvider.Now();
var auditor = boundConfiguration.DisableAuditTrail ? null : new Auditor(Configuration.DateTimeProvider);
if (isAsync)
await pipeline.FirstPoolUsageAsync(Configuration.BootstrapLock, auditor, cancellationToken).ConfigureAwait(false);
else
pipeline.FirstPoolUsage(Configuration.BootstrapLock, auditor);
TResponse response = null;
var endpoint = Endpoint.Empty(path);
if (activity is { IsAllDataRequested: true })
{
if (Configuration.Authentication is BasicAuthentication basicAuthentication)
activity.SetTag(SemanticConventions.DbUser, basicAuthentication.Username);
activity.SetTag(OpenTelemetryAttributes.ElasticTransportProductName, Configuration.ProductRegistration.Name);
activity.SetTag(OpenTelemetryAttributes.ElasticTransportProductVersion, Configuration.ProductRegistration.ProductAssemblyVersion);
activity.SetTag(OpenTelemetryAttributes.ElasticTransportVersion, ReflectionVersionInfo.TransportVersion);
activity.SetTag(SemanticConventions.UserAgentOriginal, Configuration.UserAgent.ToString());
activity.SetTag(SemanticConventions.HttpRequestMethod, endpoint.Method.GetStringValue());
}
List<PipelineException>? seenExceptions = null;
var attemptedNodes = 0;
if (pipeline.TryGetSingleNode(out var singleNode))
{
endpoint = endpoint with { Node = singleNode };
// No value in marking a single node as dead. We have no other options!
attemptedNodes = 1;
activity?.SetTag(SemanticConventions.UrlFull, endpoint.Uri.AbsoluteUri);
activity?.SetTag(SemanticConventions.ServerAddress, endpoint.Uri.Host);
activity?.SetTag(SemanticConventions.ServerPort, endpoint.Uri.Port);
try
{
if (isAsync)
response = await pipeline.CallProductEndpointAsync<TResponse>(endpoint, boundConfiguration, data, auditor, cancellationToken)
.ConfigureAwait(false);
else
response = pipeline.CallProductEndpoint<TResponse>(endpoint, boundConfiguration, data, auditor);
}
catch (PipelineException pipelineException) when (!pipelineException.Recoverable)
{
HandlePipelineException(ref response, pipelineException, pipeline, singleNode, ref seenExceptions);
}
catch (PipelineException pipelineException)
{
HandlePipelineException(ref response, pipelineException, pipeline, singleNode, ref seenExceptions);
}
catch (Exception killerException)
{
ThrowUnexpectedTransportException(killerException, seenExceptions, endpoint, response, auditor);
}
}
else
{
foreach (var node in pipeline.NextNode(startedOn, attemptedNodes, auditor))
{
attemptedNodes++;
endpoint = endpoint with { Node = node };
// If multiple nodes are attempted, the final node attempted will be used to set the operation span attributes.
// Each physical node attempt in CallProductEndpoint will also record these attributes.
activity?.SetTag(SemanticConventions.UrlFull, endpoint.Uri.AbsoluteUri);
activity?.SetTag(SemanticConventions.ServerAddress, endpoint.Uri.Host);
activity?.SetTag(SemanticConventions.ServerPort, endpoint.Uri.Port);
try
{
if (_productRegistration.SupportsSniff)
{
if (isAsync)
await pipeline.SniffOnStaleClusterAsync(auditor, cancellationToken).ConfigureAwait(false);
else
pipeline.SniffOnStaleCluster(auditor);
}
if (_productRegistration.SupportsPing)
{
if (isAsync)
await PingAsync(pipeline, node, auditor, cancellationToken).ConfigureAwait(false);
else
Ping(pipeline, node, auditor);
}
if (isAsync)
response = await pipeline.CallProductEndpointAsync<TResponse>(endpoint, boundConfiguration, data, auditor, cancellationToken)
.ConfigureAwait(false);
else
response = pipeline.CallProductEndpoint<TResponse>(endpoint, boundConfiguration, data, auditor);
if (!response.ApiCallDetails.SuccessOrKnownError)
{
pipeline.MarkDead(node);
if (_productRegistration.SupportsSniff)
{
if (isAsync)
await pipeline.SniffOnConnectionFailureAsync(auditor, cancellationToken).ConfigureAwait(false);
else
pipeline.SniffOnConnectionFailure(auditor);
}
}
}
catch (PipelineException pipelineException) when (!pipelineException.Recoverable)
{
HandlePipelineException(ref response, pipelineException, pipeline, node, ref seenExceptions);
break;
}
catch (PipelineException pipelineException)
{
HandlePipelineException(ref response, pipelineException, pipeline, node, ref seenExceptions);
}
catch (Exception killerException)
{
if (killerException is OperationCanceledException && cancellationToken.IsCancellationRequested)
pipeline.AuditCancellationRequested(auditor);
throw new UnexpectedTransportException(killerException, seenExceptions)
{
Endpoint = endpoint,
ApiCallDetails = response?.ApiCallDetails,
AuditTrail = auditor
};
}
if (cancellationToken.IsCancellationRequested)
{
pipeline.AuditCancellationRequested(auditor);
break;
}
if (response == null || !response.ApiCallDetails.SuccessOrKnownError) continue;
pipeline.MarkAlive(node);
break;
}
}
#if NET6_0_OR_GREATER
activity?.SetStatus(response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
#endif
activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode);
activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes);
// We don't check IsAllDataRequested here as that's left to the consumer.
if (configureActivity is not null && activity is not null)
configureActivity.Invoke(activity);
if (activity is { IsAllDataRequested: true })
OpenTelemetry.SetCommonAttributes(activity, Configuration);
return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, attemptedNodes, auditor, seenExceptions, response);
}
finally
{
activity?.Dispose();
}
}