public void awaitResults()

in src/java/org/apache/cassandra/service/reads/ReadCallback.java [133:213]


    public void awaitResults() throws ReadFailureException, ReadTimeoutException
    {
        boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS);
        /**
         * Here we are checking isDataPresent in addition to the responses size because there is a possibility
         * that an asynchronous speculative execution request could be returning after a local failure already
         * signaled. Responses may have been set while the data reference is not yet.
         * See {@link DigestResolver#preprocess(Message)}
         * CASSANDRA-16097
         */
        int received = resolver.responses.size();
        boolean failed = failures > 0 && (replicaPlan().readQuorum() > received || !resolver.isDataPresent());
        // If all messages came back as a TIMEOUT then signaled=true and failed=true.
        // Need to distinguish between a timeout and a failure (network, bad data, etc.), so store an extra field.
        // see CASSANDRA-17828
        boolean timedout = !signaled;
        if (failed)
            timedout = RequestCallback.isTimeout(new HashMap<>(failureReasonByEndpoint));
        WarningContext warnings = warningContext;
        // save the snapshot so abort state is not changed between now and when mayAbort gets called
        WarningsSnapshot snapshot = null;
        if (warnings != null)
        {
            snapshot = warnings.snapshot();
            // this is possible due to a race condition between waiting and responding
            // network thread creates the WarningContext to update metrics, but we are actively reading and see it is empty
            // this is likely to happen when a timeout happens or from a speculative response
            if (!snapshot.isEmpty())
                CoordinatorWarnings.update(command, snapshot);
        }

        if (signaled && !failed && replicaPlan().stillAppliesTo(ClusterMetadata.current()))
            return;

        if (isTracing())
        {
            String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
            Tracing.trace("{}; received {} of {} responses{}", !timedout ? "Failed" : "Timed out", received, replicaPlan().readQuorum(), gotData);
        }
        else if (logger.isDebugEnabled())
        {
            String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
            logger.debug("{}; received {} of {} responses{}", !timedout ? "Failed" : "Timed out", received, replicaPlan().readQuorum(), gotData);
        }

        if (snapshot != null)
            snapshot.maybeAbort(command, replicaPlan().consistencyLevel(), received, replicaPlan().readQuorum(), resolver.isDataPresent(), failureReasonByEndpoint);

        // failures keeps incrementing, and this.failureReasonByEndpoint keeps getting new entries after signaling.
        // Simpler to reason about what happened by copying this.failureReasonByEndpoint and then inferring
        // failures from it
        final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint = ImmutableMap.copyOf(this.failureReasonByEndpoint);
        int transactionRetryErrors = 0;
        int coordinatorBehindErrors = 0;
        for (RequestFailureReason reason : failureReasonByEndpoint.values())
        {
            if (reason == RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM)
                transactionRetryErrors++;
            if (reason == COORDINATOR_BEHIND)
                coordinatorBehindErrors++;
        }
        int totalRetriableFailures = transactionRetryErrors + coordinatorBehindErrors;

        // TODO (nicetohave): This could be smarter and check if retrying would succeed instead of pessimistically
        // failing unless all errors are retriable
        if (!timedout && totalRetriableFailures > 0 && totalRetriableFailures == failureReasonByEndpoint.size())
        {
            // Doesn't matter which we throw really but for clarity/metrics be specific
            // Retrying on the correct system might make this write succeed
            if (transactionRetryErrors > 0)
                throw new RetryOnDifferentSystemException();
            if (coordinatorBehindErrors > 0)
                throw new CoordinatorBehindException("Read request failed due to coordinator behind");
        }


        // Same as for writes, see AbstractWriteResponseHandler
        throw !timedout
            ? new ReadFailureException(replicaPlan().consistencyLevel(), received, replicaPlan().readQuorum(), resolver.isDataPresent(), failureReasonByEndpoint)
            : new ReadTimeoutException(replicaPlan().consistencyLevel(), received, replicaPlan().readQuorum(), resolver.isDataPresent());
    }