in interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java [158:340]
protected void evalOpInternal(
final Context ctx,
final Supplier<GremlinExecutor> gremlinExecutorSupplier,
final AbstractEvalOpProcessor.BindingSupplier bindingsSupplier) {
RequestMessage msg = ctx.getRequestMessage();
GremlinExecutor gremlinExecutor = gremlinExecutorSupplier.get();
Map<String, Object> args = msg.getArgs();
String script = (String) args.get("gremlin");
Map<String, Object> bindings =
args.get("bindings") == null ? null : (Map<String, Object>) args.get("bindings");
String upTraceId =
(bindings == null || bindings.get("X-Trace-ID") == null)
? null
: String.valueOf(bindings.get("X-Trace-ID"));
String defaultValidateQuery = "''";
// ad-hoc handling for connection validation
if (script.equals(defaultValidateQuery)) {
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SUCCESS).create());
return;
}
BigInteger jobId = idGenerator.generateId();
String jobName = idGenerator.generateName(jobId);
String language = FrontendConfig.GREMLIN_SCRIPT_LANGUAGE_NAME.get(configs);
IrMeta irMeta =
ClassUtils.callExceptionWithDetails(
() -> metaQueryCallback.beforeExec(),
Code.META_SCHEMA_NOT_READY,
Map.of("QueryId", jobId));
// If the current graph schema is empty (as service startup can occur before data loading in
// Groot), we temporarily switch to the original IR core.
// In the future, once schema-free support is implemented, we will replace this temporary
// solution.
if (irMeta.getSchema().getVertexList().isEmpty()
&& irMeta.getSchema().getEdgeList().isEmpty()) {
language = AntlrGremlinScriptEngineFactory.LANGUAGE_NAME;
}
QueryStatusCallback statusCallback =
ClassUtils.createQueryStatusCallback(
jobId,
upTraceId,
script,
new MetricsCollector.Gremlin(evalOpTimer),
queryHistogram,
configs);
statusCallback
.getQueryLogger()
.info("[query][received]: query received from the gremlin client");
QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout());
GremlinExecutor.LifeCycle lifeCycle;
switch (language) {
case AntlrGremlinScriptEngineFactory.LANGUAGE_NAME:
lifeCycle =
createLifeCycle(
ctx,
gremlinExecutorSupplier,
bindingsSupplier,
irMeta,
statusCallback,
timeoutConfig);
break;
case GremlinCalciteScriptEngineFactory.LANGUAGE_NAME:
lifeCycle =
new LifeCycleSupplier(
configs,
ctx,
queryCache,
graphPlanner,
executionClient,
jobId,
jobName,
irMeta,
statusCallback,
timeoutConfig)
.get();
break;
default:
throw new IllegalArgumentException("invalid script language name: " + language);
}
try {
CompletableFuture<Object> evalFuture =
gremlinExecutor.eval(script, language, new SimpleBindings(), lifeCycle);
evalFuture.handle(
(v, t) -> {
metaQueryCallback.afterExec(irMeta);
if (t instanceof FrontendException) {
((FrontendException) t).getDetails().put("QueryId", jobId);
}
// TimeoutException has been handled in ResultProcessor, skip it here
if (t != null && !(t instanceof TimeoutException)) {
statusCallback.onErrorEnd(t.getMessage());
Optional<Throwable> possibleTemporaryException =
determineIfTemporaryException(t);
if (possibleTemporaryException.isPresent()) {
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY)
.statusMessage(
((Throwable)
possibleTemporaryException
.get())
.getMessage())
.statusAttributeException(
(Throwable)
possibleTemporaryException.get())
.create());
} else if (t instanceof OpProcessorException) {
ctx.writeAndFlush(((OpProcessorException) t).getResponseMessage());
} else {
String errorMessage;
if (t instanceof TimedInterruptTimeoutException) {
errorMessage =
String.format(
"A timeout occurred within the script during"
+ " evaluation of [%s] - consider"
+ " increasing the limit given to"
+ " TimedInterruptCustomizerProvider",
msg);
statusCallback.getQueryLogger().warn(errorMessage);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
.statusMessage(
"Timeout during script evaluation"
+ " triggered by"
+ " TimedInterruptCustomizerProvider")
.statusAttributeException(t)
.create());
} else if (t instanceof MultipleCompilationErrorsException
&& t.getMessage().contains("Method too large")
&& ((MultipleCompilationErrorsException) t)
.getErrorCollector()
.getErrorCount()
== 1) {
errorMessage =
String.format(
"The Gremlin statement that was submitted"
+ " exceeds the maximum compilation size"
+ " allowed by the JVM, please split it"
+ " into multiple smaller statements - %s",
msg);
statusCallback.getQueryLogger().warn(errorMessage);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(
ResponseStatusCode
.SERVER_ERROR_EVALUATION)
.statusMessage(errorMessage)
.statusAttributeException(t)
.create());
} else {
errorMessage =
t.getMessage() == null ? t.toString() : t.getMessage();
statusCallback
.getQueryLogger()
.warn(
String.format(
"Exception processing a script on"
+ " request [%s].",
msg),
t);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(
ResponseStatusCode
.SERVER_ERROR_EVALUATION)
.statusMessage(errorMessage)
.statusAttributeException(t)
.create());
}
}
}
return null;
});
} catch (RejectedExecutionException var17) {
statusCallback.getQueryLogger().error(var17);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(ResponseStatusCode.TOO_MANY_REQUESTS)
.statusMessage(var17.getMessage())
.create());
}
}