in server/src/main/java/com/jetbrains/youtrackdb/internal/server/plugin/gremlin/YTDBAbstractOpProcessor.java [759:919]
private void iterateBytecodeTraversal(final Context context)
throws Exception {
final var msg = context.getRequestMessage();
final var settings = context.getSettings();
logger.debug("Traversal request {} for in thread {}", msg.getRequestId(),
Thread.currentThread().getName());
// validateTraversalRequest() ensures that this is of type Bytecode
final var bytecodeObj = msg.getArgs().get(Tokens.ARGS_GREMLIN);
final var bytecode = (Bytecode) bytecodeObj;
// timeout override - handle both deprecated and newly named configuration. earlier logic should prevent
// both configurations from being submitted at the same time
final var args = msg.getArgs();
final var seto = args.containsKey(Tokens.ARGS_EVAL_TIMEOUT) ?
((Number) args.get(Tokens.ARGS_EVAL_TIMEOUT)).longValue()
: context.getSettings().getEvaluationTimeout();
try {
final var lambdaLanguage = BytecodeHelper.getLambdaLanguage(bytecode);
if (lambdaLanguage.isPresent()) {
var errorMsg = "Execution of lambada code is not allowed on server side.";
throw new OpProcessorException("Traversal contains a lambda that cannot be compiled",
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
.statusMessage(errorMsg)
.create());
}
} catch (Exception ex) {
logger.error("Could not deserialize the Traversal instance", ex);
throw new OpProcessorException("Could not deserialize the Traversal instance",
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION)
.statusMessage(ex.getMessage())
.statusAttributeException(ex).create());
}
if (settings.enableAuditLog) {
var user = context.getChannelHandlerContext().channel().attr(StateKey.AUTHENTICATED_USER)
.get();
if (null == user) { // This is expected when using the AllowAllAuthenticator
user = AuthenticatedUser.ANONYMOUS_USER;
}
var address = context.getChannelHandlerContext().channel().remoteAddress().toString();
if (!address.isEmpty() && address.charAt(0) == '/' && address.length() > 1) {
address = address.substring(1);
}
auditLogger.info("User {} with address {} requested: {}", user.getName(), address, bytecode);
}
var traversalSource = getTraversalSource(context);
// handle bytecode based graph operations like commit/rollback commands
if (BytecodeHelper.isGraphOperation(bytecode)) {
handleGraphOperation(bytecode, traversalSource, context);
return;
}
final var timerContext = traversalOpTimer.time();
final var evalFuture = new FutureTask<Void>(() -> {
context.setStartedResponse();
try {
var traversal = JavaTranslator.of(traversalSource).translate(bytecode);
var ok = false;
try {
// compile the traversal - without it getEndStep() has nothing in it
traversal.applyStrategies();
handleIterator(context, new TraverserIterator(traversal), traversalSource);
ok = true;
} catch (Exception ex) {
Throwable t = ex;
if (ex instanceof UndeclaredThrowableException) {
t = t.getCause();
}
CloseableIterator.closeIterator(traversal);
// if any exception in the chain is TemporaryException or Failure then we should respond with the
// right error code so that the client knows to retry
final var possibleSpecialException = YTDBAbstractOpProcessor.determineIfSpecialException(
ex);
if (possibleSpecialException.isPresent()) {
final var special = possibleSpecialException.get();
final var specialResponseMsg = ResponseMessage.build(msg).
statusMessage(special.getMessage()).
statusAttributeException(special);
if (special instanceof TemporaryException) {
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY);
} else if (special instanceof Failure failure) {
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_FAIL_STEP).
statusAttribute(Tokens.STATUS_ATTRIBUTE_FAIL_STEP_MESSAGE, failure.format());
}
context.writeAndFlush(specialResponseMsg.create());
} else if (t instanceof InterruptedException
|| t instanceof TraversalInterruptedException) {
final var errorMessage = String.format(
"A timeout occurred during traversal evaluation of [%s] - consider increasing the limit given to evaluationTimeout",
msg);
logger.warn(errorMessage);
context.writeAndFlush(
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
.statusMessage(errorMessage)
.statusAttributeException(ex).create());
} else {
logger.warn(
String.format("Exception processing a Traversal on iteration for request [%s].",
msg.getRequestId()), ex);
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
.statusMessage(ex.getMessage())
.statusAttributeException(ex).create());
}
} finally {
if (!ok) {
var tx = traversalSource.tx();
if (tx.isOpen()) {
tx.rollback();
}
}
}
} catch (Throwable t) {
// if any exception in the chain is TemporaryException or Failure then we should respond with the
// right error code so that the client knows to retry
final var possibleSpecialException = YTDBAbstractOpProcessor.determineIfSpecialException(t);
if (possibleSpecialException.isPresent()) {
final var special = possibleSpecialException.get();
final var specialResponseMsg = ResponseMessage.build(msg).
statusMessage(special.getMessage()).
statusAttributeException(special);
if (special instanceof TemporaryException) {
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY);
} else if (special instanceof Failure failure) {
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_FAIL_STEP).
statusAttribute(Tokens.STATUS_ATTRIBUTE_FAIL_STEP_MESSAGE, failure.format());
}
context.writeAndFlush(specialResponseMsg.create());
} else {
logger.warn(String.format("Exception processing a Traversal on request [%s].",
msg.getRequestId()), t);
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
.statusMessage(t.getMessage())
.statusAttributeException(t).create());
if (t instanceof Error) {
//Re-throw any errors to be handled by and set as the result of evalFuture
throw t;
}
}
} finally {
timerContext.stop();
// There is a race condition that this query may have finished before the timeoutFuture was created,
// though this is very unlikely. This is handled in the settor, if this has already been grabbed.
// If we passed this point and the setter hasn't been called, it will cancel the timeoutFuture inside
// the setter to compensate.
final var timeoutFuture = context.getTimeoutExecutor();
if (null != timeoutFuture) {
timeoutFuture.cancel(true);
}
}
return null;
});
submitToGremlinExecutor(context, traversalSource, seto, evalFuture);
}