in core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java [663:820]
private void processErrorResponse(Error errorMessage) {
if (errorMessage.code == ProtocolConstants.ErrorCode.UNPREPARED) {
ByteBuffer idToReprepare = ByteBuffer.wrap(((Unprepared) errorMessage).id);
LOG.trace(
"[{}] Statement {} is not prepared on {}, repreparing",
logPrefix,
Bytes.toHexString(idToReprepare),
node);
RepreparePayload repreparePayload = session.getRepreparePayloads().get(idToReprepare);
if (repreparePayload == null) {
throw new IllegalStateException(
String.format(
"Tried to execute unprepared query %s but we don't have the data to reprepare it",
Bytes.toHexString(idToReprepare)));
}
Prepare reprepareMessage = repreparePayload.toMessage();
ThrottledAdminRequestHandler<ByteBuffer> reprepareHandler =
ThrottledAdminRequestHandler.prepare(
channel,
true,
reprepareMessage,
repreparePayload.customPayload,
Conversions.resolveRequestTimeout(statement, executionProfile),
throttler,
sessionMetricUpdater,
logPrefix);
reprepareHandler
.start()
.handle(
(repreparedId, exception) -> {
if (exception != null) {
// If the error is not recoverable, surface it to the client instead of retrying
if (exception instanceof UnexpectedResponseException) {
Message prepareErrorMessage =
((UnexpectedResponseException) exception).message;
if (prepareErrorMessage instanceof Error) {
CoordinatorException prepareError =
Conversions.toThrowable(node, (Error) prepareErrorMessage, context);
if (prepareError instanceof QueryValidationException
|| prepareError instanceof FunctionFailureException
|| prepareError instanceof ProtocolError) {
LOG.trace("[{}] Unrecoverable error on reprepare, rethrowing", logPrefix);
trackNodeError(node, prepareError, NANOTIME_NOT_MEASURED_YET);
setFinalError(statement, prepareError, node, execution);
return null;
}
}
} else if (exception instanceof RequestThrottlingException) {
trackNodeError(node, exception, NANOTIME_NOT_MEASURED_YET);
setFinalError(statement, exception, node, execution);
return null;
}
recordError(node, exception);
trackNodeError(node, exception, NANOTIME_NOT_MEASURED_YET);
LOG.trace("[{}] Reprepare failed, trying next node", logPrefix);
sendRequest(statement, null, queryPlan, execution, retryCount, false);
} else {
if (!repreparedId.equals(idToReprepare)) {
IllegalStateException illegalStateException =
new IllegalStateException(
String.format(
"ID mismatch while trying to reprepare (expected %s, got %s). "
+ "This prepared statement won't work anymore. "
+ "This usually happens when you run a 'USE...' query after "
+ "the statement was prepared.",
Bytes.toHexString(idToReprepare),
Bytes.toHexString(repreparedId)));
trackNodeError(node, illegalStateException, NANOTIME_NOT_MEASURED_YET);
setFinalError(statement, illegalStateException, node, execution);
}
LOG.trace("[{}] Reprepare sucessful, retrying", logPrefix);
sendRequest(statement, node, queryPlan, execution, retryCount, false);
}
return null;
});
return;
}
CoordinatorException error = Conversions.toThrowable(node, errorMessage, context);
NodeMetricUpdater metricUpdater = ((DefaultNode) node).getMetricUpdater();
if (error instanceof BootstrappingException) {
LOG.trace("[{}] {} is bootstrapping, trying next node", logPrefix, node);
recordError(node, error);
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
sendRequest(statement, null, queryPlan, execution, retryCount, false);
} else if (error instanceof QueryValidationException
|| error instanceof FunctionFailureException
|| error instanceof ProtocolError) {
LOG.trace("[{}] Unrecoverable error, rethrowing", logPrefix);
metricUpdater.incrementCounter(DefaultNodeMetric.OTHER_ERRORS, executionProfile.getName());
trackNodeError(node, error, NANOTIME_NOT_MEASURED_YET);
setFinalError(statement, error, node, execution);
} else {
RetryPolicy retryPolicy = Conversions.resolveRetryPolicy(context, executionProfile);
RetryVerdict verdict;
if (error instanceof ReadTimeoutException) {
ReadTimeoutException readTimeout = (ReadTimeoutException) error;
verdict =
retryPolicy.onReadTimeoutVerdict(
statement,
readTimeout.getConsistencyLevel(),
readTimeout.getBlockFor(),
readTimeout.getReceived(),
readTimeout.wasDataPresent(),
retryCount);
updateErrorMetrics(
metricUpdater,
verdict,
DefaultNodeMetric.READ_TIMEOUTS,
DefaultNodeMetric.RETRIES_ON_READ_TIMEOUT,
DefaultNodeMetric.IGNORES_ON_READ_TIMEOUT);
} else if (error instanceof WriteTimeoutException) {
WriteTimeoutException writeTimeout = (WriteTimeoutException) error;
verdict =
Conversions.resolveIdempotence(statement, executionProfile)
? retryPolicy.onWriteTimeoutVerdict(
statement,
writeTimeout.getConsistencyLevel(),
writeTimeout.getWriteType(),
writeTimeout.getBlockFor(),
writeTimeout.getReceived(),
retryCount)
: RetryVerdict.RETHROW;
updateErrorMetrics(
metricUpdater,
verdict,
DefaultNodeMetric.WRITE_TIMEOUTS,
DefaultNodeMetric.RETRIES_ON_WRITE_TIMEOUT,
DefaultNodeMetric.IGNORES_ON_WRITE_TIMEOUT);
} else if (error instanceof UnavailableException) {
UnavailableException unavailable = (UnavailableException) error;
verdict =
retryPolicy.onUnavailableVerdict(
statement,
unavailable.getConsistencyLevel(),
unavailable.getRequired(),
unavailable.getAlive(),
retryCount);
updateErrorMetrics(
metricUpdater,
verdict,
DefaultNodeMetric.UNAVAILABLES,
DefaultNodeMetric.RETRIES_ON_UNAVAILABLE,
DefaultNodeMetric.IGNORES_ON_UNAVAILABLE);
} else {
verdict =
Conversions.resolveIdempotence(statement, executionProfile)
? retryPolicy.onErrorResponseVerdict(statement, error, retryCount)
: RetryVerdict.RETHROW;
updateErrorMetrics(
metricUpdater,
verdict,
DefaultNodeMetric.OTHER_ERRORS,
DefaultNodeMetric.RETRIES_ON_OTHER_ERROR,
DefaultNodeMetric.IGNORES_ON_OTHER_ERROR);
}
processRetryVerdict(verdict, error);
}
}