in src/java/org/apache/cassandra/service/reads/ReadCallback.java [133:213]
public void awaitResults() throws ReadFailureException, ReadTimeoutException
{
boolean signaled = await(command.getTimeout(MILLISECONDS), TimeUnit.MILLISECONDS);
/**
* Here we are checking isDataPresent in addition to the responses size because there is a possibility
* that an asynchronous speculative execution request could be returning after a local failure already
* signaled. Responses may have been set while the data reference is not yet.
* See {@link DigestResolver#preprocess(Message)}
* CASSANDRA-16097
*/
int received = resolver.responses.size();
boolean failed = failures > 0 && (replicaPlan().readQuorum() > received || !resolver.isDataPresent());
// If all messages came back as a TIMEOUT then signaled=true and failed=true.
// Need to distinguish between a timeout and a failure (network, bad data, etc.), so store an extra field.
// see CASSANDRA-17828
boolean timedout = !signaled;
if (failed)
timedout = RequestCallback.isTimeout(new HashMap<>(failureReasonByEndpoint));
WarningContext warnings = warningContext;
// save the snapshot so abort state is not changed between now and when mayAbort gets called
WarningsSnapshot snapshot = null;
if (warnings != null)
{
snapshot = warnings.snapshot();
// this is possible due to a race condition between waiting and responding
// network thread creates the WarningContext to update metrics, but we are actively reading and see it is empty
// this is likely to happen when a timeout happens or from a speculative response
if (!snapshot.isEmpty())
CoordinatorWarnings.update(command, snapshot);
}
if (signaled && !failed && replicaPlan().stillAppliesTo(ClusterMetadata.current()))
return;
if (isTracing())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
Tracing.trace("{}; received {} of {} responses{}", !timedout ? "Failed" : "Timed out", received, replicaPlan().readQuorum(), gotData);
}
else if (logger.isDebugEnabled())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
logger.debug("{}; received {} of {} responses{}", !timedout ? "Failed" : "Timed out", received, replicaPlan().readQuorum(), gotData);
}
if (snapshot != null)
snapshot.maybeAbort(command, replicaPlan().consistencyLevel(), received, replicaPlan().readQuorum(), resolver.isDataPresent(), failureReasonByEndpoint);
// failures keeps incrementing, and this.failureReasonByEndpoint keeps getting new entries after signaling.
// Simpler to reason about what happened by copying this.failureReasonByEndpoint and then inferring
// failures from it
final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint = ImmutableMap.copyOf(this.failureReasonByEndpoint);
int transactionRetryErrors = 0;
int coordinatorBehindErrors = 0;
for (RequestFailureReason reason : failureReasonByEndpoint.values())
{
if (reason == RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM)
transactionRetryErrors++;
if (reason == COORDINATOR_BEHIND)
coordinatorBehindErrors++;
}
int totalRetriableFailures = transactionRetryErrors + coordinatorBehindErrors;
// TODO (nicetohave): This could be smarter and check if retrying would succeed instead of pessimistically
// failing unless all errors are retriable
if (!timedout && totalRetriableFailures > 0 && totalRetriableFailures == failureReasonByEndpoint.size())
{
// Doesn't matter which we throw really but for clarity/metrics be specific
// Retrying on the correct system might make this write succeed
if (transactionRetryErrors > 0)
throw new RetryOnDifferentSystemException();
if (coordinatorBehindErrors > 0)
throw new CoordinatorBehindException("Read request failed due to coordinator behind");
}
// Same as for writes, see AbstractWriteResponseHandler
throw !timedout
? new ReadFailureException(replicaPlan().consistencyLevel(), received, replicaPlan().readQuorum(), resolver.isDataPresent(), failureReasonByEndpoint)
: new ReadTimeoutException(replicaPlan().consistencyLevel(), received, replicaPlan().readQuorum(), resolver.isDataPresent());
}