override def askEngine()

in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala [91:275]


  override def askEngine(engineAskRequest: EngineAskRequest, sender: Sender): Any = {
    val taskId = JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties)
    LoggerUtils.setJobIdMDC(taskId)
    logger.info(s"received task: $taskId, engineAskRequest $engineAskRequest")
    if (hooksArray != null && hooksArray.size > 0) {
      val ctx = new AskEngineConnHookContext(engineAskRequest, sender)

      /** Throwing exceptions in hook will block(hook中抛异常会阻断) */
      hooksArray.foreach(h =>
        Utils.tryCatch(h.doHook(ctx)) { t =>
          {
            val engineAskAsyncId = getAsyncId
            val retryFlag = t match {
              case _: LinkisRetryException => true
              case _: RetryableException => true
              case _ =>
                ExceptionUtils.getRootCause(t) match {
                  case _: SocketTimeoutException => true
                  case _: TimeoutException => true
                  case _ =>
                    false
                }
            }
            return EngineCreateError(
              engineAskAsyncId,
              ExceptionUtils.getRootCauseMessage(t),
              retryFlag
            )
          }
        }
      )
    }

    val engineAskAsyncId = getAsyncId
    val createNodeThread = Future {
      LoggerUtils.setJobIdMDC(taskId)
      val (engineCreateKey, semaphore) =
        Utils.tryAndWarn(getKeyAndSemaphore(engineAskRequest.getLabels))
      Utils.tryFinally {
        if (null != semaphore) {
          try {
            semaphore.acquire()
            logger.info(s"$engineCreateKey succeed to get lock")
          } catch {
            case e: Exception =>
              logger.warn(
                s"Task: $taskId user ${engineAskRequest.getUser} acquire semaphore failed",
                e
              )
          }
        }
        var reuseNode: EngineNode = null
        if (!engineAskRequest.getLabels.containsKey(LabelKeyConstant.EXECUTE_ONCE_KEY)) {
          val engineReuseRequest = new EngineReuseRequest()
          engineReuseRequest.setLabels(engineAskRequest.getLabels)
          engineReuseRequest.setTimeOut(engineAskRequest.getTimeOut)
          engineReuseRequest.setUser(engineAskRequest.getUser)
          engineReuseRequest.setProperties(engineAskRequest.getProperties)
          reuseNode = Utils.tryCatch(engineReuseService.reuseEngine(engineReuseRequest, sender)) {
            t: Throwable =>
              t match {
                case retryException: LinkisRetryException =>
                  logger.info(
                    s"Task: $taskId user ${engineAskRequest.getUser} reuse engine failed ${t.getMessage}"
                  )
                case _ =>
                  logger.info(
                    s"Task: $taskId user ${engineAskRequest.getUser} reuse engine failed",
                    t
                  )
              }
              null
          }
        }

        if (null != reuseNode) {
          logger.info(
            s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by reuse node $reuseNode"
          )
          (reuseNode, true)
        } else {
          logger.info(
            s"Task: $taskId start to async($engineAskAsyncId) createEngine, ${engineAskRequest.getCreateService}"
          )
          // If the original labels contain engineInstance, remove it first (如果原来的labels含engineInstance ,先去掉)
          engineAskRequest.getLabels.remove(LabelKeyConstant.ENGINE_INSTANCE_KEY)
          // 添加引擎启动驱动任务id标签
          val labels: util.Map[String, AnyRef] = engineAskRequest.getLabels
          labels.put(LabelKeyConstant.DRIVER_TASK_KEY, taskId)

          val engineCreateRequest = new EngineCreateRequest
          engineCreateRequest.setLabels(engineAskRequest.getLabels)
          engineCreateRequest.setTimeout(engineAskRequest.getTimeOut)
          engineCreateRequest.setUser(engineAskRequest.getUser)
          engineCreateRequest.setProperties(engineAskRequest.getProperties)
          engineCreateRequest.setCreateService(engineAskRequest.getCreateService)

          val createNode = engineCreateService.createEngine(engineCreateRequest, sender)
          val timeout =
            if (engineCreateRequest.getTimeout <= 0) {
              AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
            } else engineCreateRequest.getTimeout
          // UseEngine requires a timeout (useEngine 需要加上超时)
          val createEngineNode = getEngineNodeManager.useEngine(createNode, timeout)
          if (null == createEngineNode) {
            throw new LinkisRetryException(
              AMConstant.EM_ERROR_CODE,
              s"create engine${createNode.getServiceInstance} success, but to use engine failed"
            )
          }
          logger.info(
            s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode"
          )
          (createEngineNode, false)
        }
      } {
        Utils.tryAndWarn {
          if (null != semaphore) {
            semaphore.release()
            logger.info(s"$engineCreateKey succeed to relaese lock")
          }
        }
        LoggerUtils.removeJobIdMDC()
      }

    }

    createNodeThread.onComplete {
      case Success((engineNode, isReuse)) =>
        LoggerUtils.setJobIdMDC(taskId)
        Utils.tryFinally {
          if (isReuse) {
            logger.info(
              s"Task: $taskId Success to async($engineAskAsyncId) reuseEngine $engineNode"
            )
          } else {
            logger.info(
              s"Task: $taskId Success to async($engineAskAsyncId) createEngine $engineNode"
            )
          }
          if (null != sender) {
            sender.send(EngineCreateSuccess(engineAskAsyncId, engineNode, isReuse))

          } else {
            logger.info("Will not send async useing null sender.")
          }
        } {
          LoggerUtils.removeJobIdMDC()
        }
      case Failure(exception) =>
        LoggerUtils.setJobIdMDC(taskId)
        val retryFlag = exception match {
          case retryException: LinkisRetryException => true
          case retryableException: RetryableException => true
          case _ =>
            ExceptionUtils.getRootCause(exception) match {
              case socketTimeoutException: SocketTimeoutException => true
              case timeoutException: TimeoutException => true
              case _ =>
                false
            }
        }
        val msg =
          s"Task: $taskId Failed  to async($engineAskAsyncId) create/reuse Engine, can Retry $retryFlag";
        if (!retryFlag) {
          logger.info(msg, exception)
        } else {
          logger.info(s"msg: ${msg} canRetry Exception: ${exception.getClass.getName}")
        }

        Utils.tryFinally {
          sender.send(
            EngineCreateError(
              engineAskAsyncId,
              ExceptionUtils.getRootCauseMessage(exception),
              retryFlag
            )
          )
        } {
          LoggerUtils.removeJobIdMDC()
        }
    }
    LoggerUtils.removeJobIdMDC()
    EngineAskAsyncResponse(engineAskAsyncId, Sender.getThisServiceInstance)
  }