private void iterateBytecodeTraversal()

in server/src/main/java/com/jetbrains/youtrackdb/internal/server/plugin/gremlin/YTDBAbstractOpProcessor.java [759:919]


  private void iterateBytecodeTraversal(final Context context)
      throws Exception {
    final var msg = context.getRequestMessage();
    final var settings = context.getSettings();
    logger.debug("Traversal request {} for in thread {}", msg.getRequestId(),
        Thread.currentThread().getName());

    // validateTraversalRequest() ensures that this is of type Bytecode
    final var bytecodeObj = msg.getArgs().get(Tokens.ARGS_GREMLIN);
    final var bytecode = (Bytecode) bytecodeObj;

    // timeout override - handle both deprecated and newly named configuration. earlier logic should prevent
    // both configurations from being submitted at the same time
    final var args = msg.getArgs();
    final var seto = args.containsKey(Tokens.ARGS_EVAL_TIMEOUT) ?
        ((Number) args.get(Tokens.ARGS_EVAL_TIMEOUT)).longValue()
        : context.getSettings().getEvaluationTimeout();

    try {
      final var lambdaLanguage = BytecodeHelper.getLambdaLanguage(bytecode);
      if (lambdaLanguage.isPresent()) {
        var errorMsg = "Execution of lambada code is not allowed on server side.";
        throw new OpProcessorException("Traversal contains a lambda that cannot be compiled",
            ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
                .statusMessage(errorMsg)
                .create());
      }
    } catch (Exception ex) {
      logger.error("Could not deserialize the Traversal instance", ex);
      throw new OpProcessorException("Could not deserialize the Traversal instance",
          ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION)
              .statusMessage(ex.getMessage())
              .statusAttributeException(ex).create());
    }
    if (settings.enableAuditLog) {
      var user = context.getChannelHandlerContext().channel().attr(StateKey.AUTHENTICATED_USER)
          .get();
      if (null == user) {    // This is expected when using the AllowAllAuthenticator
        user = AuthenticatedUser.ANONYMOUS_USER;
      }
      var address = context.getChannelHandlerContext().channel().remoteAddress().toString();
      if (!address.isEmpty() && address.charAt(0) == '/' && address.length() > 1) {
        address = address.substring(1);
      }
      auditLogger.info("User {} with address {} requested: {}", user.getName(), address, bytecode);
    }

    var traversalSource = getTraversalSource(context);
    // handle bytecode based graph operations like commit/rollback commands
    if (BytecodeHelper.isGraphOperation(bytecode)) {
      handleGraphOperation(bytecode, traversalSource, context);
      return;
    }

    final var timerContext = traversalOpTimer.time();
    final var evalFuture = new FutureTask<Void>(() -> {
      context.setStartedResponse();
      try {
        var traversal = JavaTranslator.of(traversalSource).translate(bytecode);
        var ok = false;
        try {
          // compile the traversal - without it getEndStep() has nothing in it
          traversal.applyStrategies();
          handleIterator(context, new TraverserIterator(traversal), traversalSource);
          ok = true;
        } catch (Exception ex) {
          Throwable t = ex;
          if (ex instanceof UndeclaredThrowableException) {
            t = t.getCause();
          }

          CloseableIterator.closeIterator(traversal);

          // if any exception in the chain is TemporaryException or Failure then we should respond with the
          // right error code so that the client knows to retry
          final var possibleSpecialException = YTDBAbstractOpProcessor.determineIfSpecialException(
              ex);
          if (possibleSpecialException.isPresent()) {
            final var special = possibleSpecialException.get();
            final var specialResponseMsg = ResponseMessage.build(msg).
                statusMessage(special.getMessage()).
                statusAttributeException(special);
            if (special instanceof TemporaryException) {
              specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY);
            } else if (special instanceof Failure failure) {
              specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_FAIL_STEP).
                  statusAttribute(Tokens.STATUS_ATTRIBUTE_FAIL_STEP_MESSAGE, failure.format());
            }
            context.writeAndFlush(specialResponseMsg.create());
          } else if (t instanceof InterruptedException
              || t instanceof TraversalInterruptedException) {

            final var errorMessage = String.format(
                "A timeout occurred during traversal evaluation of [%s] - consider increasing the limit given to evaluationTimeout",
                msg);
            logger.warn(errorMessage);
            context.writeAndFlush(
                ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
                    .statusMessage(errorMessage)
                    .statusAttributeException(ex).create());
          } else {
            logger.warn(
                String.format("Exception processing a Traversal on iteration for request [%s].",
                    msg.getRequestId()), ex);
            context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
                .statusMessage(ex.getMessage())
                .statusAttributeException(ex).create());
          }
        } finally {
          if (!ok) {
            var tx = traversalSource.tx();
            if (tx.isOpen()) {
              tx.rollback();
            }
          }
        }
      } catch (Throwable t) {
        // if any exception in the chain is TemporaryException or Failure then we should respond with the
        // right error code so that the client knows to retry
        final var possibleSpecialException = YTDBAbstractOpProcessor.determineIfSpecialException(t);
        if (possibleSpecialException.isPresent()) {
          final var special = possibleSpecialException.get();
          final var specialResponseMsg = ResponseMessage.build(msg).
              statusMessage(special.getMessage()).
              statusAttributeException(special);
          if (special instanceof TemporaryException) {
            specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY);
          } else if (special instanceof Failure failure) {
            specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_FAIL_STEP).
                statusAttribute(Tokens.STATUS_ATTRIBUTE_FAIL_STEP_MESSAGE, failure.format());
          }
          context.writeAndFlush(specialResponseMsg.create());
        } else {
          logger.warn(String.format("Exception processing a Traversal on request [%s].",
              msg.getRequestId()), t);
          context.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR)
              .statusMessage(t.getMessage())
              .statusAttributeException(t).create());
          if (t instanceof Error) {
            //Re-throw any errors to be handled by and set as the result of evalFuture
            throw t;
          }
        }
      } finally {
        timerContext.stop();

        // There is a race condition that this query may have finished before the timeoutFuture was created,
        // though this is very unlikely. This is handled in the settor, if this has already been grabbed.
        // If we passed this point and the setter hasn't been called, it will cancel the timeoutFuture inside
        // the setter to compensate.
        final var timeoutFuture = context.getTimeoutExecutor();
        if (null != timeoutFuture) {
          timeoutFuture.cancel(true);
        }
      }

      return null;
    });

    submitToGremlinExecutor(context, traversalSource, seto, evalFuture);
  }