in gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/op/traversal/TraversalOpProcessor.java [162:322]
private void iterateBytecodeTraversal(final Context context) throws Exception {
final RequestMessage msg = context.getRequestMessage();
final Settings settings = context.getSettings();
logger.debug("Traversal request {} for in thread {}", msg.getRequestId(), Thread.currentThread().getName());
// validateTraversalRequest() ensures that this is of type Bytecode
final Object bytecodeObj = msg.getArgs().get(Tokens.ARGS_GREMLIN);
final Bytecode bytecode = (Bytecode) bytecodeObj;
// earlier validation in selection of this op method should free us to cast this without worry
final Map<String, String> aliases = (Map<String, String>) msg.optionalArgs(Tokens.ARGS_ALIASES).get();
// timeout override - handle both deprecated and newly named configuration. earlier logic should prevent
// both configurations from being submitted at the same time
final Map<String, Object> args = msg.getArgs();
final long seto = args.containsKey(Tokens.ARGS_EVAL_TIMEOUT) ?
((Number) args.get(Tokens.ARGS_EVAL_TIMEOUT)).longValue() : context.getSettings().getEvaluationTimeout();
final GraphManager graphManager = context.getGraphManager();
final String traversalSourceName = aliases.entrySet().iterator().next().getValue();
final TraversalSource g = graphManager.getTraversalSource(traversalSourceName);
final Traversal.Admin<?, ?> traversal;
try {
final Optional<String> lambdaLanguage = BytecodeHelper.getLambdaLanguage(bytecode);
if (!lambdaLanguage.isPresent())
traversal = JavaTranslator.of(g).translate(bytecode);
else
traversal = context.getGremlinExecutor().eval(bytecode, EMPTY_BINDINGS, lambdaLanguage.get(), traversalSourceName);
} catch (ScriptException ex) {
logger.error("Traversal contains a lambda that cannot be compiled", ex);
throw new OpProcessorException("Traversal contains a lambda that cannot be compiled",
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
.statusMessage(ex.getMessage())
.statusAttributeException(ex).create());
} catch (Exception ex) {
logger.error("Could not deserialize the Traversal instance", ex);
throw new OpProcessorException("Could not deserialize the Traversal instance",
ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION)
.statusMessage(ex.getMessage())
.statusAttributeException(ex).create());
}
if (settings.enableAuditLog) {
AuthenticatedUser user = context.getChannelHandlerContext().channel().attr(StateKey.AUTHENTICATED_USER).get();
if (null == user) { // This is expected when using the AllowAllAuthenticator
user = AuthenticatedUser.ANONYMOUS_USER;
}
String address = context.getChannelHandlerContext().channel().remoteAddress().toString();
if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
auditLogger.info("User {} with address {} requested: {}", user.getName(), address, bytecode);
}
final Timer.Context timerContext = traversalOpTimer.time();
final FutureTask<Void> evalFuture = new FutureTask<>(() -> {
context.setStartedResponse();
final Graph graph = g.getGraph();
try {
beforeProcessing(graph, context);
try {
// compile the traversal - without it getEndStep() has nothing in it
traversal.applyStrategies();
handleIterator(context, new TraverserIterator(traversal), graph);
} catch (Exception ex) {
Throwable t = ex;
if (ex instanceof UndeclaredThrowableException)
t = t.getCause();
CloseableIterator.closeIterator(traversal);
// if any exception in the chain is TemporaryException or Failure then we should respond with the
// right error code so that the client knows to retry
final Optional<Throwable> possibleSpecialException = determineIfSpecialException(ex);
if (possibleSpecialException.isPresent()) {
final Throwable special = possibleSpecialException.get();
final ResponseMessage.Builder specialResponseMsg = ResponseMessage.build(msg).
statusMessage(special.getMessage()).
statusAttributeException(special);
if (special instanceof TemporaryException) {
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY);
} else if (special instanceof Failure) {
final Failure failure = (Failure) special;
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_FAIL_STEP).
statusAttribute(Tokens.STATUS_ATTRIBUTE_FAIL_STEP_MESSAGE, failure.format());
}
context.writeAndFlush(specialResponseMsg.create());
} else if (t instanceof InterruptedException || t instanceof TraversalInterruptedException) {
graphManager.onQueryError(msg, t);
final String errorMessage = String.format("A timeout occurred during traversal evaluation of [%s] - consider increasing the limit given to evaluationTimeout", msg);
logger.warn(errorMessage);
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
.statusMessage(errorMessage)
.statusAttributeException(ex).create());
} else {
logger.warn(String.format("Exception processing a Traversal on iteration for request [%s].", msg.getRequestId()), ex);
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
.statusMessage(ex.getMessage())
.statusAttributeException(ex).create());
}
onError(graph, context, ex);
}
} catch (Throwable t) {
onError(graph, context, t);
// if any exception in the chain is TemporaryException or Failure then we should respond with the
// right error code so that the client knows to retry
final Optional<Throwable> possibleSpecialException = determineIfSpecialException(t);
if (possibleSpecialException.isPresent()) {
final Throwable special = possibleSpecialException.get();
final ResponseMessage.Builder specialResponseMsg = ResponseMessage.build(msg).
statusMessage(special.getMessage()).
statusAttributeException(special);
if (special instanceof TemporaryException) {
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY);
} else if (special instanceof Failure) {
final Failure failure = (Failure) special;
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_FAIL_STEP).
statusAttribute(Tokens.STATUS_ATTRIBUTE_FAIL_STEP_MESSAGE, failure.format());
}
context.writeAndFlush(specialResponseMsg.create());
} else {
logger.warn(String.format("Exception processing a Traversal on request [%s].", msg.getRequestId()), t);
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
.statusMessage(t.getMessage())
.statusAttributeException(t).create());
if (t instanceof Error) {
//Re-throw any errors to be handled by and set as the result of evalFuture
throw t;
}
}
} finally {
timerContext.stop();
// There is a race condition that this query may have finished before the timeoutFuture was created,
// though this is very unlikely. This is handled in the settor, if this has already been grabbed.
// If we passed this point and the setter hasn't been called, it will cancel the timeoutFuture inside
// the setter to compensate.
final ScheduledFuture<?> timeoutFuture = context.getTimeoutExecutor();
if (null != timeoutFuture)
timeoutFuture.cancel(true);
}
return null;
});
try {
final Future<?> executionFuture = context.getGremlinExecutor().getExecutorService().submit(evalFuture);
if (seto > 0) {
// Schedule a timeout in the thread pool for future execution
context.setTimeoutExecutor(context.getScheduledExecutorService().schedule(() -> {
executionFuture.cancel(true);
if (!context.getStartedResponse()) {
context.sendTimeoutResponse();
}
}, seto, TimeUnit.MILLISECONDS));
}
} catch (RejectedExecutionException ree) {
context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.TOO_MANY_REQUESTS)
.statusMessage("Rate limiting").create());
}
}