in gremlin-groovy/src/main/java/org/apache/tinkerpop/gremlin/groovy/engine/GremlinExecutor.java [286:393]
public CompletableFuture<Object> eval(final Object gremlin, final String language, final Bindings boundVars, final LifeCycle lifeCycle) {
final String lang = Optional.ofNullable(language).orElse("gremlin-groovy");
if (logger.isDebugEnabled()) {
logger.debug("Preparing to evaluate script - {} - in thread [{}]", gremlin, Thread.currentThread().getName());
}
final Bindings bindings = new SimpleBindings();
bindings.putAll(globalBindings);
bindings.putAll(boundVars);
// override the timeout if the lifecycle has a value assigned. if the script contains with(timeout)
// options then allow that value to override what's provided on the lifecycle
final Optional<Long> timeoutDefinedInScript = gremlin instanceof String
? GremlinScriptChecker.parse((String) gremlin).getTimeout()
: Optional.empty();
final long scriptEvalTimeOut = timeoutDefinedInScript.orElse(
lifeCycle.getEvaluationTimeoutOverride().orElse(evaluationTimeout));
final CompletableFuture<Object> evaluationFuture = new CompletableFuture<>();
final FutureTask<Void> evalFuture = new FutureTask<>(() -> {
try {
lifeCycle.getBeforeEval().orElse(beforeEval).accept(bindings);
logger.debug("Evaluating script - {} - in thread [{}]", gremlin, Thread.currentThread().getName());
Object o;
if (gremlin instanceof String) {
o = gremlinScriptEngineManager.getEngineByName(lang).eval((String) gremlin, bindings);
}
else if (gremlin instanceof Bytecode) {
final Bytecode bytecode = (Bytecode) gremlin;
final Traversal.Admin<?, ?> traversal = eval(
bytecode, bindings, BytecodeHelper.getLambdaLanguage(bytecode).orElse("gremlin-groovy"), "g");
o = IteratorUtils.asList(traversal);
}
else {
throw new IllegalArgumentException("Invalid gremlin type.");
}
// apply a transformation before sending back the result - useful when trying to force serialization
// in the same thread that the eval took place given ThreadLocal nature of graphs as well as some
// transactional constraints
final Object result = lifeCycle.getTransformResult().isPresent() ?
lifeCycle.getTransformResult().get().apply(o) : o;
// a mechanism for taking the final result and doing something with it in the same thread, but
// AFTER the eval and transform are done and that future completed. this provides a final means
// for working with the result in the same thread as it was eval'd
if (lifeCycle.getWithResult().isPresent()) lifeCycle.getWithResult().get().accept(result);
lifeCycle.getAfterSuccess().orElse(afterSuccess).accept(bindings);
// the evaluationFuture must be completed after all processing as an exception in lifecycle events
// that must raise as an exception to the caller who has the returned evaluationFuture. in other words,
// if it occurs before this point, then the handle() method won't be called again if there is an
// exception that ends up below trying to completeExceptionally()
evaluationFuture.complete(result);
} catch (Throwable ex) {
final Throwable root = null == ex.getCause() ? ex : ExceptionUtils.getRootCause(ex);
// thread interruptions will typically come as the result of a timeout, so in those cases,
// check for that situation and convert to TimeoutException
if (root instanceof InterruptedException
|| root instanceof TraversalInterruptedException
|| root instanceof InterruptedIOException) {
lifeCycle.getAfterTimeout().orElse(afterTimeout).accept(bindings, root);
evaluationFuture.completeExceptionally(new TimeoutException(
String.format("Evaluation exceeded the configured 'evaluationTimeout' threshold of %s ms or evaluation was otherwise cancelled directly for request [%s]: %s", scriptEvalTimeOut, gremlin, root.getMessage())));
} else {
lifeCycle.getAfterFailure().orElse(afterFailure).accept(bindings, root);
evaluationFuture.completeExceptionally(root);
}
}
return null;
});
final WeakReference<CompletableFuture<Object>> evaluationFutureRef = new WeakReference<>(evaluationFuture);
final Future<?> executionFuture = executorService.submit(evalFuture);
if (scriptEvalTimeOut > 0) {
// Schedule a timeout in the thread pool for future execution
final ScheduledFuture<?> sf = scheduledExecutorService.schedule(() -> {
if (executionFuture.cancel(true)) {
final CompletableFuture<Object> ef = evaluationFutureRef.get();
if (ef != null) {
ef.completeExceptionally(new TimeoutException(
String.format("Evaluation exceeded the configured 'evaluationTimeout' threshold of %s ms or evaluation was otherwise cancelled directly for request [%s]", scriptEvalTimeOut, gremlin)));
}
}
}, scriptEvalTimeOut, TimeUnit.MILLISECONDS);
// Cancel the scheduled timeout if the eval future is complete or the script evaluation failed with exception
evaluationFuture.handleAsync((v, t) -> {
if (!sf.isDone()) {
logger.debug("Killing scheduled timeout on script evaluation - {} - as the eval completed (possibly with exception).", gremlin);
sf.cancel(true);
}
// no return is necessary - nothing downstream is concerned with what happens in here
return null;
}, scheduledExecutorService);
}
return evaluationFuture;
}