public Message askEngineConn()

in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java [120:266]


  public Message askEngineConn(
      HttpServletRequest req, @RequestBody EngineAskRequest engineAskRequest)
      throws IOException, InterruptedException {
    String userName = ModuleUserUtils.getOperationUser(req, "askEngineConn");
    engineAskRequest.setUser(userName);
    long timeout = engineAskRequest.getTimeOut();
    if (timeout <= 0) {
      timeout = AMConfiguration.ENGINE_CONN_START_REST_MAX_WAIT_TIME.getValue().toLong();
      engineAskRequest.setTimeOut(timeout);
    }
    Map<String, Object> retEngineNode = new HashMap<>();
    logger.info(
        "User {} try to ask an engineConn with maxStartTime {}. EngineAskRequest is {}.",
        userName,
        ByteTimeUtils.msDurationToString(timeout),
        engineAskRequest);
    Sender sender = Sender.getSender(Sender.getThisServiceInstance());
    EngineNode engineNode = null;

    // try to reuse ec first
    String taskId = JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties());
    LoggerUtils.setJobIdMDC(taskId);
    logger.info("received task : {}, engineAskRequest : {}", taskId, engineAskRequest);
    if (!engineAskRequest.getLabels().containsKey(LabelKeyConstant.EXECUTE_ONCE_KEY)) {
      EngineReuseRequest engineReuseRequest = new EngineReuseRequest();
      engineReuseRequest.setLabels(engineAskRequest.getLabels());
      engineReuseRequest.setTimeOut(engineAskRequest.getTimeOut());
      engineReuseRequest.setUser(engineAskRequest.getUser());
      engineReuseRequest.setProperties(engineAskRequest.getProperties());
      boolean end = false;
      EngineNode reuseNode = null;
      int count = 0;
      int MAX_RETRY = 2;
      while (!end) {
        try {
          reuseNode = engineReuseService.reuseEngine(engineReuseRequest, sender);
          end = true;
        } catch (LinkisRetryException e) {
          logger.error(
              "task: {}, user: {} reuse engine failed", taskId, engineReuseRequest.getUser(), e);
          Thread.sleep(1000);
          end = false;
          count += 1;
          if (count > MAX_RETRY) {
            end = true;
          }
        } catch (Exception e1) {
          logger.info(
              "task: {} user: {} reuse engine failed", taskId, engineReuseRequest.getUser(), e1);
          end = true;
        }
      }
      if (null != reuseNode) {
        logger.info(
            "Finished to ask engine for task: {}, user: {} by reuse node {}",
            taskId,
            engineReuseRequest.getUser(),
            reuseNode);
        LoggerUtils.removeJobIdMDC();
        engineNode = reuseNode;
      }
    }

    if (null != engineNode) {
      fillResultEngineNode(retEngineNode, engineNode);
      return Message.ok("reuse engineConn ended.").data("engine", retEngineNode);
    }

    String engineAskAsyncId = EngineAskEngineService$.MODULE$.getAsyncId();
    Callable<Object> createECTask =
        new Callable() {
          @Override
          public Object call() {
            LoggerUtils.setJobIdMDC(taskId);
            logger.info(
                "Task: {}, start to async({}) createEngine: {}",
                taskId,
                engineAskAsyncId,
                engineAskRequest.getCreateService());
            // remove engineInstance label if exists
            engineAskRequest.getLabels().remove("engineInstance");
            EngineCreateRequest engineCreateRequest = new EngineCreateRequest();
            engineCreateRequest.setLabels(engineAskRequest.getLabels());
            engineCreateRequest.setTimeout(engineAskRequest.getTimeOut());
            engineCreateRequest.setUser(engineAskRequest.getUser());
            engineCreateRequest.setProperties(engineAskRequest.getProperties());
            engineCreateRequest.setCreateService(engineAskRequest.getCreateService());
            try {
              EngineNode createNode = engineCreateService.createEngine(engineCreateRequest, sender);
              long timeout = 0L;
              if (engineCreateRequest.getTimeout() <= 0) {
                timeout = AMConfiguration.ENGINE_START_MAX_TIME.getValue().toLong();
              } else {
                timeout = engineCreateRequest.getTimeout();
              }
              // useEngine need to add timeout
              EngineNode createEngineNode = engineNodeManager.useEngine(createNode, timeout);
              if (null == createEngineNode) {
                throw new LinkisRetryException(
                    AMConstant.EM_ERROR_CODE,
                    "create engine${createNode.getServiceInstance} success, but to use engine failed");
              }
              logger.info(
                  "Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode");
              return createEngineNode;
            } catch (Exception e) {
              logger.error(
                  "Task: {} failed to ask engine for user {} by create node", taskId, userName, e);
              return new LinkisRetryException(AMConstant.EM_ERROR_CODE, e.getMessage());
            } finally {
              LoggerUtils.removeJobIdMDC();
            }
          }
        };

    try {
      Object rs = createECTask.call();
      if (rs instanceof LinkisRetryException) {
        throw (LinkisRetryException) rs;
      } else {
        engineNode = (EngineNode) rs;
      }
    } catch (LinkisRetryException retryException) {
      logger.error(
          "User {} create engineConn failed get retry  exception. can be Retry",
          userName,
          retryException);
      return Message.error(
              String.format(
                  "Create engineConn failed, caused by %s.",
                  ExceptionUtils.getRootCauseMessage(retryException)))
          .data("canRetry", true);
    } catch (Exception e) {
      LoggerUtils.removeJobIdMDC();
      logger.error("User {} create engineConn failed get retry  exception", userName, e);
      return Message.error(
          String.format(
              "Create engineConn failed, caused by %s.", ExceptionUtils.getRootCauseMessage(e)));
    }

    LoggerUtils.removeJobIdMDC();
    fillResultEngineNode(retEngineNode, engineNode);
    logger.info(
        "Finished to create a engineConn for user {}. NodeInfo is {}.", userName, engineNode);
    // to transform to a map
    return Message.ok("create engineConn ended.").data("engine", retEngineNode);
  }