public TResponse Request()

in src/Elastic.Transport/DistributedTransport.cs [63:274]


	public TResponse Request<TResponse>(
		in EndpointPath path,
		PostData? data,
		Action<Activity>? configureActivity,
		IRequestConfiguration? localConfiguration
	) where TResponse : TransportResponse, new() =>
		RequestCoreAsync<TResponse>(isAsync: false, path, data, configureActivity, localConfiguration)
			.EnsureCompleted();

	/// <inheritdoc cref="ITransport.RequestAsync{TResponse}"/>
	public Task<TResponse> RequestAsync<TResponse>(
		in EndpointPath path,
		PostData? data,
		Action<Activity>? configureActivity,
		IRequestConfiguration? localConfiguration,
		CancellationToken cancellationToken = default
	) where TResponse : TransportResponse, new() =>
		RequestCoreAsync<TResponse>(isAsync: true, path, data, configureActivity, localConfiguration, cancellationToken)
			.AsTask();

	private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
		bool isAsync,
		EndpointPath path,
		PostData? data,
		Action<Activity>? configureActivity,
		IRequestConfiguration? localConfiguration,
		CancellationToken cancellationToken = default
	) where TResponse : TransportResponse, new()
	{
		Activity activity = null;

		if (OpenTelemetry.ElasticTransportActivitySource.HasListeners())
			activity = OpenTelemetry.ElasticTransportActivitySource.StartActivity(path.Method.GetStringValue(),
				ActivityKind.Client);

		try
		{
			// Unless per request configuration is provided, we can reuse a BoundConfiguration
			// that is specific to this transport. If the IRequestConfiguration is an instance
			// of BoundConfiguration we use that cached instance directly without rebinding.
			var boundConfiguration = localConfiguration switch
			{
				BoundConfiguration bc => bc,
				{ } rc => new BoundConfiguration(Configuration, rc),
				_ => TransportBoundConfiguration
			};

			Configuration.OnConfigurationBound?.Invoke(boundConfiguration);

			var pipeline = boundConfiguration == TransportBoundConfiguration ? TransportPipeline : Configuration.PipelineProvider.Create(boundConfiguration);
			var startedOn = Configuration.DateTimeProvider.Now();
			var auditor = boundConfiguration.DisableAuditTrail ? null : new Auditor(Configuration.DateTimeProvider);

			if (isAsync)
				await pipeline.FirstPoolUsageAsync(Configuration.BootstrapLock, auditor, cancellationToken).ConfigureAwait(false);
			else
				pipeline.FirstPoolUsage(Configuration.BootstrapLock, auditor);

			TResponse response = null;

			var endpoint = Endpoint.Empty(path);

			if (activity is { IsAllDataRequested: true })
			{
				if (Configuration.Authentication is BasicAuthentication basicAuthentication)
					activity.SetTag(SemanticConventions.DbUser, basicAuthentication.Username);

				activity.SetTag(OpenTelemetryAttributes.ElasticTransportProductName, Configuration.ProductRegistration.Name);
				activity.SetTag(OpenTelemetryAttributes.ElasticTransportProductVersion, Configuration.ProductRegistration.ProductAssemblyVersion);
				activity.SetTag(OpenTelemetryAttributes.ElasticTransportVersion, ReflectionVersionInfo.TransportVersion);
				activity.SetTag(SemanticConventions.UserAgentOriginal, Configuration.UserAgent.ToString());
				activity.SetTag(SemanticConventions.HttpRequestMethod, endpoint.Method.GetStringValue());
			}

			List<PipelineException>? seenExceptions = null;
			var attemptedNodes = 0;

			if (pipeline.TryGetSingleNode(out var singleNode))
			{
				endpoint = endpoint with { Node = singleNode };
				// No value in marking a single node as dead. We have no other options!
				attemptedNodes = 1;
				activity?.SetTag(SemanticConventions.UrlFull, endpoint.Uri.AbsoluteUri);
				activity?.SetTag(SemanticConventions.ServerAddress, endpoint.Uri.Host);
				activity?.SetTag(SemanticConventions.ServerPort, endpoint.Uri.Port);

				try
				{
					if (isAsync)
						response = await pipeline.CallProductEndpointAsync<TResponse>(endpoint, boundConfiguration, data, auditor, cancellationToken)
							.ConfigureAwait(false);
					else
						response = pipeline.CallProductEndpoint<TResponse>(endpoint, boundConfiguration, data, auditor);
				}
				catch (PipelineException pipelineException) when (!pipelineException.Recoverable)
				{
					HandlePipelineException(ref response, pipelineException, pipeline, singleNode, ref seenExceptions);
				}
				catch (PipelineException pipelineException)
				{
					HandlePipelineException(ref response, pipelineException, pipeline, singleNode, ref seenExceptions);
				}
				catch (Exception killerException)
				{
					ThrowUnexpectedTransportException(killerException, seenExceptions, endpoint, response, auditor);
				}
			}
			else
			{
				foreach (var node in pipeline.NextNode(startedOn, attemptedNodes, auditor))
				{
					attemptedNodes++;
					endpoint = endpoint with { Node = node };

					// If multiple nodes are attempted, the final node attempted will be used to set the operation span attributes.
					// Each physical node attempt in CallProductEndpoint will also record these attributes.
					activity?.SetTag(SemanticConventions.UrlFull, endpoint.Uri.AbsoluteUri);
					activity?.SetTag(SemanticConventions.ServerAddress, endpoint.Uri.Host);
					activity?.SetTag(SemanticConventions.ServerPort, endpoint.Uri.Port);

					try
					{
						if (_productRegistration.SupportsSniff)
						{
							if (isAsync)
								await pipeline.SniffOnStaleClusterAsync(auditor, cancellationToken).ConfigureAwait(false);
							else
								pipeline.SniffOnStaleCluster(auditor);
						}
						if (_productRegistration.SupportsPing)
						{
							if (isAsync)
								await PingAsync(pipeline, node, auditor, cancellationToken).ConfigureAwait(false);
							else
								Ping(pipeline, node, auditor);
						}

						if (isAsync)
							response = await pipeline.CallProductEndpointAsync<TResponse>(endpoint, boundConfiguration, data, auditor, cancellationToken)
								.ConfigureAwait(false);
						else
							response = pipeline.CallProductEndpoint<TResponse>(endpoint, boundConfiguration, data, auditor);

						if (!response.ApiCallDetails.SuccessOrKnownError)
						{
							pipeline.MarkDead(node);

							if (_productRegistration.SupportsSniff)
							{
								if (isAsync)
									await pipeline.SniffOnConnectionFailureAsync(auditor, cancellationToken).ConfigureAwait(false);
								else
									pipeline.SniffOnConnectionFailure(auditor);
							}
						}
					}
					catch (PipelineException pipelineException) when (!pipelineException.Recoverable)
					{
						HandlePipelineException(ref response, pipelineException, pipeline, node, ref seenExceptions);
						break;
					}
					catch (PipelineException pipelineException)
					{
						HandlePipelineException(ref response, pipelineException, pipeline, node, ref seenExceptions);
					}
					catch (Exception killerException)
					{
						if (killerException is OperationCanceledException && cancellationToken.IsCancellationRequested)
							pipeline.AuditCancellationRequested(auditor);

						throw new UnexpectedTransportException(killerException, seenExceptions)
						{
							Endpoint = endpoint,
							ApiCallDetails = response?.ApiCallDetails,
							AuditTrail = auditor
						};
					}

					if (cancellationToken.IsCancellationRequested)
					{
						pipeline.AuditCancellationRequested(auditor);
						break;
					}

					if (response == null || !response.ApiCallDetails.SuccessOrKnownError) continue;

					pipeline.MarkAlive(node);
					break;
				}
			}

#if NET6_0_OR_GREATER
			activity?.SetStatus(response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType ? ActivityStatusCode.Ok : ActivityStatusCode.Error);
#endif

			activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode);
			activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes);

			// We don't check IsAllDataRequested here as that's left to the consumer.
			if (configureActivity is not null && activity is not null)
				configureActivity.Invoke(activity);

			if (activity is { IsAllDataRequested: true })
				OpenTelemetry.SetCommonAttributes(activity, Configuration);

			return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, attemptedNodes, auditor, seenExceptions, response);
		}
		finally
		{
			activity?.Dispose();
		}
	}