private Mono executeFeedOperationWithAvailabilityStrategy()

in sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java [7245:7470]


    private <T> Mono<T> executeFeedOperationWithAvailabilityStrategy(
        final ResourceType resourceType,
        final OperationType operationType,
        final Supplier<DocumentClientRetryPolicy> retryPolicyFactory,
        final RxDocumentServiceRequest req,
        final BiFunction<Supplier<DocumentClientRetryPolicy>, RxDocumentServiceRequest, Mono<T>> feedOperation,
        final String collectionLink) {

        checkNotNull(retryPolicyFactory, "Argument 'retryPolicyFactory' must not be null.");
        checkNotNull(req, "Argument 'req' must not be null.");
        assert(resourceType == ResourceType.Document);

        CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig =
            this.getEffectiveEndToEndOperationLatencyPolicyConfig(
                req.requestContext.getEndToEndOperationLatencyPolicyConfig(), resourceType, operationType);

        List<String> initialExcludedRegions = req.requestContext.getExcludeRegions();
        List<String> orderedApplicableRegionsForSpeculation = this.getApplicableRegionsForSpeculation(
            endToEndPolicyConfig,
            resourceType,
            operationType,
            false,
            initialExcludedRegions);

        Map<PartitionKeyRangeWrapper, PartitionKeyRangeWrapper> partitionKeyRangesWithSuccess = new ConcurrentHashMap<>();

        if (orderedApplicableRegionsForSpeculation.size() < 2) {
            FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreakerForRequestOutsideOfAvailabilityStrategyFlow
                = new FeedOperationContextForCircuitBreaker(
                partitionKeyRangesWithSuccess,
                false,
                collectionLink);

            feedOperationContextForCircuitBreakerForRequestOutsideOfAvailabilityStrategyFlow.setIsRequestHedged(false);

            AvailabilityStrategyContext availabilityStrategyContext = new AvailabilityStrategyContext(false, false);

            CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest
                = new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
                    feedOperationContextForCircuitBreakerForRequestOutsideOfAvailabilityStrategyFlow,
                null,
                availabilityStrategyContext);

            req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest);

            // There is at most one applicable region - no hedging possible
            return feedOperation.apply(retryPolicyFactory, req);
        }

        FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreakerForParentRequestInAvailabilityStrategyFlow
            = new FeedOperationContextForCircuitBreaker(
            partitionKeyRangesWithSuccess,
            true,
            collectionLink);

        feedOperationContextForCircuitBreakerForParentRequestInAvailabilityStrategyFlow.setIsRequestHedged(false);

        AvailabilityStrategyContext availabilityStrategyContext = new AvailabilityStrategyContext(true, false);

        CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
            feedOperationContextForCircuitBreakerForParentRequestInAvailabilityStrategyFlow,
            null,
            availabilityStrategyContext);

        req.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequest);

        ThresholdBasedAvailabilityStrategy availabilityStrategy =
            (ThresholdBasedAvailabilityStrategy)endToEndPolicyConfig.getAvailabilityStrategy();
        List<Mono<NonTransientFeedOperationResult<T>>> monoList = new ArrayList<>();

        orderedApplicableRegionsForSpeculation
            .forEach(region -> {
                RxDocumentServiceRequest clonedRequest = req.clone();

                if (monoList.isEmpty()) {
                    // no special error handling for transient errors to suppress them here
                    // because any cross-regional retries are expected to be processed
                    // by the ClientRetryPolicy for the initial request - so, any outcome of the
                    // initial Mono should be treated as non-transient error - even when
                    // the error would otherwise be treated as transient
                    FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreakerForNonHedgedRequest
                        = new FeedOperationContextForCircuitBreaker(
                        partitionKeyRangesWithSuccess,
                        true,
                        collectionLink);

                    feedOperationContextForCircuitBreakerForNonHedgedRequest.setIsRequestHedged(false);

                    AvailabilityStrategyContext availabilityStrategyContextForNonHedgedRequest = new AvailabilityStrategyContext(true, false);

                    CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequestForNonHedgedRequest = new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
                        feedOperationContextForCircuitBreakerForNonHedgedRequest,
                        null,
                        availabilityStrategyContextForNonHedgedRequest);

                    clonedRequest.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequestForNonHedgedRequest);

                    Mono<NonTransientFeedOperationResult<T>> initialMonoAcrossAllRegions =
                        handleCircuitBreakingFeedbackForFeedOperationWithAvailabilityStrategy(feedOperation.apply(retryPolicyFactory, clonedRequest)
                            .map(NonTransientFeedOperationResult::new)
                            .onErrorResume(
                                RxDocumentClientImpl::isCosmosException,
                                t -> Mono.just(
                                    new NonTransientFeedOperationResult<>(
                                        Utils.as(Exceptions.unwrap(t), CosmosException.class)))), clonedRequest);

                    if (logger.isDebugEnabled()) {
                        monoList.add(initialMonoAcrossAllRegions.doOnSubscribe(c -> logger.debug(
                            "STARTING to process {} operation in region '{}'",
                            operationType,
                            region)));
                    } else {
                        monoList.add(initialMonoAcrossAllRegions);
                    }
                } else {
                    clonedRequest.requestContext.setExcludeRegions(
                        getEffectiveExcludedRegionsForHedging(
                            initialExcludedRegions,
                            orderedApplicableRegionsForSpeculation,
                            region)
                    );

                    FeedOperationContextForCircuitBreaker feedOperationContextForCircuitBreakerForHedgedRequest
                        = new FeedOperationContextForCircuitBreaker(
                        partitionKeyRangesWithSuccess,
                        true,
                        collectionLink);

                    feedOperationContextForCircuitBreakerForHedgedRequest.setIsRequestHedged(true);

                    AvailabilityStrategyContext availabilityStrategyContextForHedgedRequest = new AvailabilityStrategyContext(true, true);

                    CrossRegionAvailabilityContextForRxDocumentServiceRequest crossRegionAvailabilityContextForRequestForHedgedRequest
                        = new CrossRegionAvailabilityContextForRxDocumentServiceRequest(
                            feedOperationContextForCircuitBreakerForHedgedRequest,
                        null,
                        availabilityStrategyContextForHedgedRequest
                    );

                    clonedRequest.requestContext.setCrossRegionAvailabilityContext(crossRegionAvailabilityContextForRequestForHedgedRequest);

                    clonedRequest.requestContext.setKeywordIdentifiers(req.requestContext.getKeywordIdentifiers());

                    // Non-Transient errors are mapped to a value - this ensures the firstWithValue
                    // operator below will complete the composite Mono for both successful values
                    // and non-transient errors
                    Mono<NonTransientFeedOperationResult<T>> regionalCrossRegionRetryMono =
                        handleCircuitBreakingFeedbackForFeedOperationWithAvailabilityStrategy(feedOperation.apply(retryPolicyFactory, clonedRequest)
                            .map(NonTransientFeedOperationResult::new)
                            .onErrorResume(
                                RxDocumentClientImpl::isNonTransientCosmosException,
                                t -> Mono.just(
                                    new NonTransientFeedOperationResult<>(
                                        Utils.as(Exceptions.unwrap(t), CosmosException.class)))), clonedRequest);

                    Duration delayForCrossRegionalRetry = (availabilityStrategy)
                        .getThreshold()
                        .plus((availabilityStrategy)
                            .getThresholdStep()
                            .multipliedBy(monoList.size() - 1));

                    if (logger.isDebugEnabled()) {
                        monoList.add(
                            regionalCrossRegionRetryMono
                                .doOnSubscribe(c -> logger.debug("STARTING to process {} operation in region '{}'", operationType, region))
                                .delaySubscription(delayForCrossRegionalRetry));
                    } else {
                        monoList.add(
                            regionalCrossRegionRetryMono
                                .delaySubscription(delayForCrossRegionalRetry));
                    }
                }
            });

        // NOTE - merging diagnosticsFactory cannot only happen in
        // doFinally operator because the doFinally operator is a side effect method -
        // meaning it executes concurrently with firing the onComplete/onError signal
        // doFinally is also triggered by cancellation
        // So, to make sure merging the Context happens synchronously in line we
        // have to ensure merging is happening on error/completion
        // and also in doOnCancel.
        return Mono
            .firstWithValue(monoList)
            .flatMap(nonTransientResult -> {
                if (nonTransientResult.isError()) {
                    return Mono.error(nonTransientResult.exception);
                }

                return Mono.just(nonTransientResult.response);
            })
            .onErrorMap(throwable -> {
                Throwable exception = Exceptions.unwrap(throwable);

                if (exception instanceof NoSuchElementException) {

                    List<Throwable> innerThrowables = Exceptions
                        .unwrapMultiple(exception.getCause());

                    int index = 0;
                    for (Throwable innerThrowable : innerThrowables) {
                        Throwable innerException = Exceptions.unwrap(innerThrowable);

                        // collect latest CosmosException instance bubbling up for a region
                        if (innerException instanceof CosmosException) {
                            return Utils.as(innerException, CosmosException.class);
                        } else if (innerException instanceof NoSuchElementException) {
                            logger.trace(
                                "Operation in {} completed with empty result because it was cancelled.",
                                orderedApplicableRegionsForSpeculation.get(index));
                        } else if (logger.isWarnEnabled()) {
                            String message = "Unexpected Non-CosmosException when processing operation in '"
                                + orderedApplicableRegionsForSpeculation.get(index)
                                + "'.";
                            logger.warn(
                                message,
                                innerException
                            );
                        }

                        index++;
                    }
                }

                return exception;
            });
    }