in core/src/main/java/com/datastax/dse/driver/internal/core/cql/continuous/ContinuousRequestHandlerBase.java [1023:1108]
private void processUnprepared(@NonNull Unprepared errorMessage) {
assert lock.isHeldByCurrentThread();
ByteBuffer idToReprepare = ByteBuffer.wrap(errorMessage.id);
LOG.trace(
"[{}] Statement {} is not prepared on {}, re-preparing",
logPrefix,
Bytes.toHexString(idToReprepare),
node);
RepreparePayload repreparePayload = session.getRepreparePayloads().get(idToReprepare);
if (repreparePayload == null) {
throw new IllegalStateException(
String.format(
"Tried to execute unprepared query %s but we don't have the data to re-prepare it",
Bytes.toHexString(idToReprepare)));
}
Prepare prepare = repreparePayload.toMessage();
Duration timeout = executionProfile.getDuration(DefaultDriverOption.REQUEST_TIMEOUT);
ThrottledAdminRequestHandler.prepare(
channel,
true,
prepare,
repreparePayload.customPayload,
timeout,
throttler,
sessionMetricUpdater,
logPrefix)
.start()
.whenComplete(
(repreparedId, exception) -> {
// If we run into an unrecoverable error, surface it to the client instead of
// retrying
Throwable fatalError = null;
if (exception == null) {
if (!repreparedId.equals(idToReprepare)) {
IllegalStateException illegalStateException =
new IllegalStateException(
String.format(
"ID mismatch while trying to reprepare (expected %s, got %s). "
+ "This prepared statement won't work anymore. "
+ "This usually happens when you run a 'USE...' query after "
+ "the statement was prepared.",
Bytes.toHexString(idToReprepare), Bytes.toHexString(repreparedId)));
trackNodeError(node, illegalStateException);
fatalError = illegalStateException;
} else {
LOG.trace(
"[{}] Re-prepare successful, retrying on the same node ({})",
logPrefix,
node);
sendRequest(statement, node, executionIndex, retryCount, false);
}
} else {
if (exception instanceof UnexpectedResponseException) {
Message prepareErrorMessage = ((UnexpectedResponseException) exception).message;
if (prepareErrorMessage instanceof Error) {
CoordinatorException prepareError =
DseConversions.toThrowable(node, (Error) prepareErrorMessage, context);
if (prepareError instanceof QueryValidationException
|| prepareError instanceof FunctionFailureException
|| prepareError instanceof ProtocolError) {
LOG.trace("[{}] Unrecoverable error on re-prepare, rethrowing", logPrefix);
trackNodeError(node, prepareError);
fatalError = prepareError;
}
}
} else if (exception instanceof RequestThrottlingException) {
trackNodeError(node, exception);
fatalError = exception;
}
if (fatalError == null) {
LOG.trace("[{}] Re-prepare failed, trying next node", logPrefix);
recordError(node, exception);
trackNodeError(node, exception);
sendRequest(statement, null, executionIndex, retryCount, false);
}
}
if (fatalError != null) {
lock.lock();
try {
abort(fatalError, true);
} finally {
lock.unlock();
}
}
});
}