in Microsoft.Azure.Cosmos/src/Handler/RequestInvokerHandler.cs [180:379]
public virtual async Task<ResponseMessage> SendAsync(
string resourceUriString,
ResourceType resourceType,
OperationType operationType,
RequestOptions requestOptions,
ContainerInternal cosmosContainerCore,
FeedRange feedRange,
Stream streamPayload,
Action<RequestMessage> requestEnricher,
ITrace trace,
CancellationToken cancellationToken)
{
if (resourceUriString == null)
{
throw new ArgumentNullException(nameof(resourceUriString));
}
if (trace == null)
{
throw new ArgumentNullException(nameof(trace));
}
// This is needed for query where a single
// user request might span multiple backend requests.
// This will still have a single request id for retry scenarios
ActivityScope activityScope = ActivityScope.CreateIfDefaultActivityId();
Debug.Assert(activityScope == null || (activityScope != null &&
(operationType != OperationType.SqlQuery || operationType != OperationType.Query || operationType != OperationType.QueryPlan)),
"There should be an activity id already set");
using (ITrace childTrace = trace.StartChild(this.FullHandlerName, TraceComponent.RequestHandler, Tracing.TraceLevel.Info))
{
try
{
HttpMethod method = RequestInvokerHandler.GetHttpMethod(resourceType, operationType, streamPayload != null);
RequestMessage request = new RequestMessage(
method,
resourceUriString,
childTrace)
{
OperationType = operationType,
ResourceType = resourceType,
RequestOptions = requestOptions,
Content = streamPayload,
};
request.Headers.SDKSupportedCapabilities = Headers.SDKSUPPORTEDCAPABILITIES;
if (feedRange != null)
{
if (!request.OperationType.IsPointOperation())
{
feedRange = await RequestInvokerHandler.ResolveFeedRangeBasedOnPrefixContainerAsync(
feedRange: feedRange,
cosmosContainerCore: cosmosContainerCore,
cancellationToken: cancellationToken);
}
if (feedRange is FeedRangePartitionKey feedRangePartitionKey)
{
if (cosmosContainerCore == null && object.ReferenceEquals(feedRangePartitionKey.PartitionKey, Cosmos.PartitionKey.None))
{
throw new ArgumentException($"{nameof(cosmosContainerCore)} can not be null with partition key as PartitionKey.None");
}
else if (feedRangePartitionKey.PartitionKey.IsNone)
{
try
{
PartitionKeyInternal partitionKeyInternal = await cosmosContainerCore.GetNonePartitionKeyValueAsync(
childTrace,
cancellationToken);
request.Headers.PartitionKey = partitionKeyInternal.ToJsonString();
}
catch (DocumentClientException dce)
{
return dce.ToCosmosResponseMessage(request);
}
catch (CosmosException ce)
{
return ce.ToCosmosResponseMessage(request);
}
}
else
{
request.Headers.PartitionKey = feedRangePartitionKey.PartitionKey.ToJsonString();
}
}
else if (feedRange is FeedRangeEpk feedRangeEpk)
{
ContainerProperties collectionFromCache;
try
{
if (cosmosContainerCore == null)
{
throw new ArgumentException($"The container core can not be null for FeedRangeEpk");
}
collectionFromCache = await cosmosContainerCore.GetCachedContainerPropertiesAsync(
forceRefresh: false,
childTrace,
cancellationToken);
}
catch (CosmosException ex)
{
return ex.ToCosmosResponseMessage(request);
}
PartitionKeyRangeCache routingMapProvider = await this.client.DocumentClient.GetPartitionKeyRangeCacheAsync(childTrace);
IReadOnlyList<PartitionKeyRange> overlappingRanges = await routingMapProvider.TryGetOverlappingRangesAsync(
collectionFromCache.ResourceId,
feedRangeEpk.Range,
childTrace,
forceRefresh: false);
if (overlappingRanges == null)
{
CosmosException notFound = new CosmosException(
$"Stale cache for rid '{collectionFromCache.ResourceId}'",
statusCode: System.Net.HttpStatusCode.NotFound,
subStatusCode: default,
activityId: Guid.Empty.ToString(),
requestCharge: default);
return notFound.ToCosmosResponseMessage(request);
}
// For epk range filtering we can end up in one of 3 cases:
if (overlappingRanges.Count > 1)
{
// 1) The EpkRange spans more than one physical partition
// In this case it means we have encountered a split and
// we need to bubble that up to the higher layers to update their datastructures
CosmosException goneException = new CosmosException(
message: $"Epk Range: {feedRangeEpk.Range} is gone.",
statusCode: System.Net.HttpStatusCode.Gone,
subStatusCode: (int)SubStatusCodes.PartitionKeyRangeGone,
activityId: Guid.NewGuid().ToString(),
requestCharge: default);
return goneException.ToCosmosResponseMessage(request);
}
// overlappingRanges.Count == 1
else
{
Range<string> singleRange = overlappingRanges[0].ToRange();
if ((singleRange.Min == feedRangeEpk.Range.Min) && (singleRange.Max == feedRangeEpk.Range.Max))
{
// 2) The EpkRange spans exactly one physical partition
// In this case we can route to the physical pkrange id
request.PartitionKeyRangeId = new Documents.PartitionKeyRangeIdentity(overlappingRanges[0].Id);
}
else
{
// 3) The EpkRange spans less than single physical partition
// In this case we route to the physical partition and
// pass the epk range headers to filter within partition
request.PartitionKeyRangeId = new Documents.PartitionKeyRangeIdentity(overlappingRanges[0].Id);
request.Headers.ReadFeedKeyType = RntbdConstants.RntdbReadFeedKeyType.EffectivePartitionKeyRange.ToString();
request.Headers.StartEpk = feedRangeEpk.Range.Min;
request.Headers.EndEpk = feedRangeEpk.Range.Max;
}
}
}
else
{
request.PartitionKeyRangeId = feedRange is FeedRangePartitionKeyRange feedRangePartitionKeyRange
? new Documents.PartitionKeyRangeIdentity(feedRangePartitionKeyRange.PartitionKeyRangeId)
: throw new InvalidOperationException($"Unknown feed range type: '{feedRange.GetType()}'.");
}
}
if (operationType == OperationType.Upsert)
{
request.Headers.IsUpsert = bool.TrueString;
}
else if (operationType == OperationType.Patch)
{
request.Headers.ContentType = RuntimeConstants.MediaTypes.JsonPatch;
}
if (ChangeFeedHelper.IsChangeFeedWithQueryRequest(operationType, streamPayload != null))
{
request.Headers.Add(HttpConstants.HttpHeaders.IsQuery, bool.TrueString);
request.Headers.Add(HttpConstants.HttpHeaders.ContentType, RuntimeConstants.MediaTypes.QueryJson);
}
if (cosmosContainerCore != null)
{
request.ContainerId = cosmosContainerCore?.Id;
request.DatabaseId = cosmosContainerCore?.Database.Id;
}
requestEnricher?.Invoke(request);
return await this.SendAsync(request, cancellationToken);
}
finally
{
activityScope?.Dispose();
}
}
}