protected void evalOpInternal()

in interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/IrStandardOpProcessor.java [158:340]


    protected void evalOpInternal(
            final Context ctx,
            final Supplier<GremlinExecutor> gremlinExecutorSupplier,
            final AbstractEvalOpProcessor.BindingSupplier bindingsSupplier) {
        RequestMessage msg = ctx.getRequestMessage();
        GremlinExecutor gremlinExecutor = gremlinExecutorSupplier.get();
        Map<String, Object> args = msg.getArgs();
        String script = (String) args.get("gremlin");
        Map<String, Object> bindings =
                args.get("bindings") == null ? null : (Map<String, Object>) args.get("bindings");
        String upTraceId =
                (bindings == null || bindings.get("X-Trace-ID") == null)
                        ? null
                        : String.valueOf(bindings.get("X-Trace-ID"));

        String defaultValidateQuery = "''";
        // ad-hoc handling for connection validation
        if (script.equals(defaultValidateQuery)) {
            ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SUCCESS).create());
            return;
        }
        BigInteger jobId = idGenerator.generateId();
        String jobName = idGenerator.generateName(jobId);
        String language = FrontendConfig.GREMLIN_SCRIPT_LANGUAGE_NAME.get(configs);
        IrMeta irMeta =
                ClassUtils.callExceptionWithDetails(
                        () -> metaQueryCallback.beforeExec(),
                        Code.META_SCHEMA_NOT_READY,
                        Map.of("QueryId", jobId));
        // If the current graph schema is empty (as service startup can occur before data loading in
        // Groot), we temporarily switch to the original IR core.
        // In the future, once schema-free support is implemented, we will replace this temporary
        // solution.
        if (irMeta.getSchema().getVertexList().isEmpty()
                && irMeta.getSchema().getEdgeList().isEmpty()) {
            language = AntlrGremlinScriptEngineFactory.LANGUAGE_NAME;
        }
        QueryStatusCallback statusCallback =
                ClassUtils.createQueryStatusCallback(
                        jobId,
                        upTraceId,
                        script,
                        new MetricsCollector.Gremlin(evalOpTimer),
                        queryHistogram,
                        configs);
        statusCallback
                .getQueryLogger()
                .info("[query][received]: query received from the gremlin client");
        QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout());
        GremlinExecutor.LifeCycle lifeCycle;
        switch (language) {
            case AntlrGremlinScriptEngineFactory.LANGUAGE_NAME:
                lifeCycle =
                        createLifeCycle(
                                ctx,
                                gremlinExecutorSupplier,
                                bindingsSupplier,
                                irMeta,
                                statusCallback,
                                timeoutConfig);
                break;
            case GremlinCalciteScriptEngineFactory.LANGUAGE_NAME:
                lifeCycle =
                        new LifeCycleSupplier(
                                        configs,
                                        ctx,
                                        queryCache,
                                        graphPlanner,
                                        executionClient,
                                        jobId,
                                        jobName,
                                        irMeta,
                                        statusCallback,
                                        timeoutConfig)
                                .get();
                break;
            default:
                throw new IllegalArgumentException("invalid script language name: " + language);
        }
        try {
            CompletableFuture<Object> evalFuture =
                    gremlinExecutor.eval(script, language, new SimpleBindings(), lifeCycle);
            evalFuture.handle(
                    (v, t) -> {
                        metaQueryCallback.afterExec(irMeta);
                        if (t instanceof FrontendException) {
                            ((FrontendException) t).getDetails().put("QueryId", jobId);
                        }
                        // TimeoutException has been handled in ResultProcessor, skip it here
                        if (t != null && !(t instanceof TimeoutException)) {
                            statusCallback.onErrorEnd(t.getMessage());
                            Optional<Throwable> possibleTemporaryException =
                                    determineIfTemporaryException(t);
                            if (possibleTemporaryException.isPresent()) {
                                ctx.writeAndFlush(
                                        ResponseMessage.build(msg)
                                                .code(ResponseStatusCode.SERVER_ERROR_TEMPORARY)
                                                .statusMessage(
                                                        ((Throwable)
                                                                        possibleTemporaryException
                                                                                .get())
                                                                .getMessage())
                                                .statusAttributeException(
                                                        (Throwable)
                                                                possibleTemporaryException.get())
                                                .create());
                            } else if (t instanceof OpProcessorException) {
                                ctx.writeAndFlush(((OpProcessorException) t).getResponseMessage());
                            } else {
                                String errorMessage;
                                if (t instanceof TimedInterruptTimeoutException) {
                                    errorMessage =
                                            String.format(
                                                    "A timeout occurred within the script during"
                                                            + " evaluation of [%s] - consider"
                                                            + " increasing the limit given to"
                                                            + " TimedInterruptCustomizerProvider",
                                                    msg);
                                    statusCallback.getQueryLogger().warn(errorMessage);
                                    ctx.writeAndFlush(
                                            ResponseMessage.build(msg)
                                                    .code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
                                                    .statusMessage(
                                                            "Timeout during script evaluation"
                                                                + " triggered by"
                                                                + " TimedInterruptCustomizerProvider")
                                                    .statusAttributeException(t)
                                                    .create());
                                } else if (t instanceof MultipleCompilationErrorsException
                                        && t.getMessage().contains("Method too large")
                                        && ((MultipleCompilationErrorsException) t)
                                                        .getErrorCollector()
                                                        .getErrorCount()
                                                == 1) {
                                    errorMessage =
                                            String.format(
                                                    "The Gremlin statement that was submitted"
                                                        + " exceeds the maximum compilation size"
                                                        + " allowed by the JVM, please split it"
                                                        + " into multiple smaller statements - %s",
                                                    msg);
                                    statusCallback.getQueryLogger().warn(errorMessage);
                                    ctx.writeAndFlush(
                                            ResponseMessage.build(msg)
                                                    .code(
                                                            ResponseStatusCode
                                                                    .SERVER_ERROR_EVALUATION)
                                                    .statusMessage(errorMessage)
                                                    .statusAttributeException(t)
                                                    .create());
                                } else {
                                    errorMessage =
                                            t.getMessage() == null ? t.toString() : t.getMessage();
                                    statusCallback
                                            .getQueryLogger()
                                            .warn(
                                                    String.format(
                                                            "Exception processing a script on"
                                                                    + " request [%s].",
                                                            msg),
                                                    t);
                                    ctx.writeAndFlush(
                                            ResponseMessage.build(msg)
                                                    .code(
                                                            ResponseStatusCode
                                                                    .SERVER_ERROR_EVALUATION)
                                                    .statusMessage(errorMessage)
                                                    .statusAttributeException(t)
                                                    .create());
                                }
                            }
                        }
                        return null;
                    });
        } catch (RejectedExecutionException var17) {
            statusCallback.getQueryLogger().error(var17);
            ctx.writeAndFlush(
                    ResponseMessage.build(msg)
                            .code(ResponseStatusCode.TOO_MANY_REQUESTS)
                            .statusMessage(var17.getMessage())
                            .create());
        }
    }