in sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java [7245:7470]
private <T> Mono<T> executeFeedOperationWithAvailabilityStrategy(
final ResourceType resourceType,
final OperationType operationType,
final Supplier<DocumentClientRetryPolicy> retryPolicyFactory,
final RxDocumentServiceRequest req,
final BiFunction<Supplier<DocumentClientRetryPolicy>, RxDocumentServiceRequest, Mono<T>> feedOperation,
final String collectionLink) {
checkNotNull(retryPolicyFactory, "Argument 'retryPolicyFactory' must not be null.");
checkNotNull(req, "Argument 'req' must not be null.");
assert(resourceType == ResourceType.Document);
CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig =
this.getEffectiveEndToEndOperationLatencyPolicyConfig(
req.requestContext.getEndToEndOperationLatencyPolicyConfig(), resourceType, operationType);
List<String> initialExcludedRegions = req.requestContext.getExcludeRegions();
List<String> orderedApplicableRegionsForSpeculation = this.getApplicableRegionsForSpeculation(
endToEndPolicyConfig,
resourceType,
operationType,
false,
initialExcludedRegions);
Map<PartitionKeyRangeWrapper, PartitionKeyRangeWrapper> partitionKeyRangesWithSuccess = new ConcurrentHashMap<>();
if (orderedApplicableRegionsForSpeculation.size() < 2) {
FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreakerForRequestOutsideOfAvailabilityStrategyFlow
= new FeedOperationContextForCircuitBreaker(
partitionKeyRangesWithSuccess,
false,
collectionLink);
feedOperationContextForCircuitBreakerForRequestOutsideOfAvailabilityStrategyFlow.setIsRequestHedged(false);
AvailabilityStrategyContext availabilityStrategyContext = new AvailabilityStrategyContext(false, false);
CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest
= new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
feedOperationContextForCircuitBreakerForRequestOutsideOfAvailabilityStrategyFlow,
null,
availabilityStrategyContext);
req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest);
// There is at most one applicable region - no hedging possible
return feedOperation.apply(retryPolicyFactory, req);
}
FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreakerForParentRequestInAvailabilityStrategyFlow
= new FeedOperationContextForCircuitBreaker(
partitionKeyRangesWithSuccess,
true,
collectionLink);
feedOperationContextForCircuitBreakerForParentRequestInAvailabilityStrategyFlow.setIsRequestHedged(false);
AvailabilityStrategyContext availabilityStrategyContext = new AvailabilityStrategyContext(true, false);
CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
feedOperationContextForCircuitBreakerForParentRequestInAvailabilityStrategyFlow,
null,
availabilityStrategyContext);
req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest);
ThresholdBasedAvailabilityStrategy availabilityStrategy =
(ThresholdBasedAvailabilityStrategy)endToEndPolicyConfig.getAvailabilityStrategy();
List<Mono<NonTransientFeedOperationResult<T>>> monoList = new ArrayList<>();
orderedApplicableRegionsForSpeculation
.forEach(region -> {
RxDocumentServiceRequest clonedRequest = req.clone();
if (monoList.isEmpty()) {
// no special error handling for transient errors to suppress them here
// because any cross-regional retries are expected to be processed
// by the ClientRetryPolicy for the initial request - so, any outcome of the
// initial Mono should be treated as non-transient error - even when
// the error would otherwise be treated as transient
FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreakerForNonHedgedRequest
= new FeedOperationContextForCircuitBreaker(
partitionKeyRangesWithSuccess,
true,
collectionLink);
feedOperationContextForCircuitBreakerForNonHedgedRequest.setIsRequestHedged(false);
AvailabilityStrategyContext availabilityStrategyContextForNonHedgedRequest = new AvailabilityStrategyContext(true, false);
CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequestForNonHedgedRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
feedOperationContextForCircuitBreakerForNonHedgedRequest,
null,
availabilityStrategyContextForNonHedgedRequest);
clonedRequest.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequestForNonHedgedRequest);
Mono<NonTransientFeedOperationResult<T>> initialMonoAcrossAllRegions =
handleCircuitBreakingFeedbackForFeedOperationWithAvailabilityStrategy(feedOperation.apply(retryPolicyFactory, clonedRequest)
.map(NonTransientFeedOperationResult::new)
.onErrorResume(
RxDocumentClientImpl::isCosmosException,
t -> Mono.just(
new NonTransientFeedOperationResult<>(
Utils.as(Exceptions.unwrap(t), CosmosException.class)))), clonedRequest);
if (logger.isDebugEnabled()) {
monoList.add(initialMonoAcrossAllRegions.doOnSubscribe(c -> logger.debug(
"STARTING to process {} operation in region '{}'",
operationType,
region)));
} else {
monoList.add(initialMonoAcrossAllRegions);
}
} else {
clonedRequest.requestContext.setExcludeRegions(
getEffectiveExcludedRegionsForHedging(
initialExcludedRegions,
orderedApplicableRegionsForSpeculation,
region)
);
FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreakerForHedgedRequest
= new FeedOperationContextForCircuitBreaker(
partitionKeyRangesWithSuccess,
true,
collectionLink);
feedOperationContextForCircuitBreakerForHedgedRequest.setIsRequestHedged(true);
AvailabilityStrategyContext availabilityStrategyContextForHedgedRequest = new AvailabilityStrategyContext(true, true);
CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequestForHedgedRequest
= new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
feedOperationContextForCircuitBreakerForHedgedRequest,
null,
availabilityStrategyContextForHedgedRequest
);
clonedRequest.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequestForHedgedRequest);
clonedRequest.requestContext.setKeywordIdentifiers(req.requestContext.getKeywordIdentifiers());
// Non-Transient errors are mapped to a value - this ensures the firstWithValue
// operator below will complete the composite Mono for both successful values
// and non-transient errors
Mono<NonTransientFeedOperationResult<T>> regionalCrossRegionRetryMono =
handleCircuitBreakingFeedbackForFeedOperationWithAvailabilityStrategy(feedOperation.apply(retryPolicyFactory, clonedRequest)
.map(NonTransientFeedOperationResult::new)
.onErrorResume(
RxDocumentClientImpl::isNonTransientCosmosException,
t -> Mono.just(
new NonTransientFeedOperationResult<>(
Utils.as(Exceptions.unwrap(t), CosmosException.class)))), clonedRequest);
Duration delayForCrossRegionalRetry = (availabilityStrategy)
.getThreshold()
.plus((availabilityStrategy)
.getThresholdStep()
.multipliedBy(monoList.size() - 1));
if (logger.isDebugEnabled()) {
monoList.add(
regionalCrossRegionRetryMono
.doOnSubscribe(c -> logger.debug("STARTING to process {} operation in region '{}'", operationType, region))
.delaySubscription(delayForCrossRegionalRetry));
} else {
monoList.add(
regionalCrossRegionRetryMono
.delaySubscription(delayForCrossRegionalRetry));
}
}
});
// NOTE - merging diagnosticsFactory cannot only happen in
// doFinally operator because the doFinally operator is a side effect method -
// meaning it executes concurrently with firing the onComplete/onError signal
// doFinally is also triggered by cancellation
// So, to make sure merging the Context happens synchronously in line we
// have to ensure merging is happening on error/completion
// and also in doOnCancel.
return Mono
.firstWithValue(monoList)
.flatMap(nonTransientResult -> {
if (nonTransientResult.isError()) {
return Mono.error(nonTransientResult.exception);
}
return Mono.just(nonTransientResult.response);
})
.onErrorMap(throwable -> {
Throwable exception = Exceptions.unwrap(throwable);
if (exception instanceof NoSuchElementException) {
List<Throwable> innerThrowables = Exceptions
.unwrapMultiple(exception.getCause());
int index = 0;
for (Throwable innerThrowable : innerThrowables) {
Throwable innerException = Exceptions.unwrap(innerThrowable);
// collect latest CosmosException instance bubbling up for a region
if (innerException instanceof CosmosException) {
return Utils.as(innerException, CosmosException.class);
} else if (innerException instanceof NoSuchElementException) {
logger.trace(
"Operation in {} completed with empty result because it was cancelled.",
orderedApplicableRegionsForSpeculation.get(index));
} else if (logger.isWarnEnabled()) {
String message = "Unexpected Non-CosmosException when processing operation in '"
+ orderedApplicableRegionsForSpeculation.get(index)
+ "'.";
logger.warn(
message,
innerException
);
}
index++;
}
}
return exception;
});
}