override def reuseEngine()

in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala [89:268]


  override def reuseEngine(engineReuseRequest: EngineReuseRequest, sender: Sender): EngineNode = {
    val taskId = JobUtils.getJobIdFromStringMap(engineReuseRequest.getProperties)
    logger.info(s"Task $taskId Start to reuse Engine for request: $engineReuseRequest")
    val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
    val labelList = LabelUtils
      .distinctLabel(
        labelBuilderFactory.getLabels(engineReuseRequest.getLabels),
        userLabelService.getUserLabels(engineReuseRequest.getUser)
      )
      .asScala

    val exclusionInstances: Array[String] =
      labelList.find(_.isInstanceOf[ReuseExclusionLabel]) match {
        case Some(l) =>
          l.asInstanceOf[ReuseExclusionLabel].getInstances
        case None =>
          Array.empty[String]
      }

    if (
        exclusionInstances.length == 1 && exclusionInstances(
          0
        ) == GovernanceCommonConf.WILDCARD_CONSTANT
    ) {
      logger.info(
        s"Task $taskId exists ReuseExclusionLabel and the configuration does not choose to reuse EC"
      )
      return null
    }

    var filterLabelList = labelList.filter(_.isInstanceOf[EngineNodeLabel]).asJava

    val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])
    engineConnAliasLabel.setAlias(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
    filterLabelList.add(engineConnAliasLabel)

    // label chooser
    if (null != engineReuseLabelChoosers) {
      engineReuseLabelChoosers.asScala.foreach { chooser =>
        filterLabelList = chooser.chooseLabels(filterLabelList)
      }
    }

    val instances = nodeLabelService.getScoredNodeMapsByLabels(filterLabelList)

    if (null != instances && null != exclusionInstances && exclusionInstances.nonEmpty) {
      val instancesKeys = instances.asScala.keys.toArray
      instancesKeys
        .filter { instance =>
          exclusionInstances.exists(_.equalsIgnoreCase(instance.getServiceInstance.getInstance))
        }
        .foreach { instance =>
          logger.info(
            s"will  be not reuse ${instance.getServiceInstance}, cause use exclusion label"
          )
          instances.remove(instance)
        }
    }
    if (null == instances || instances.isEmpty) {
      throw new LinkisRetryException(
        AMConstant.ENGINE_ERROR_CODE,
        s"No engine can be reused, cause from db is null"
      )
    }
    var engineScoreList =
      getEngineNodeManager.getEngineNodes(instances.asScala.keys.toSeq.toArray)

    // 获取需要的资源
    if (AMConfiguration.EC_REUSE_WITH_RESOURCE_RULE_ENABLE) {
      val labels: util.List[Label[_]] =
        engineCreateService.buildLabel(engineReuseRequest.getLabels, engineReuseRequest.getUser)
      if (engineReuseRequest.getProperties == null) {
        engineReuseRequest.setProperties(new util.HashMap[String, String]())
      }

      val engineType: String = LabelUtil.getEngineType(labels)
      if (
          StringUtils.isNotBlank(engineType) && AMConfiguration.EC_REUSE_WITH_RESOURCE_WITH_ECS
            .contains(engineType.toLowerCase())
      ) {
        val resource = engineCreateService.generateResource(
          engineReuseRequest.getProperties,
          engineReuseRequest.getUser,
          labelFilter.choseEngineLabel(labels),
          AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
        )

        // 过滤掉资源不满足的引擎
        engineScoreList = engineScoreList
          .filter(engine => engine.getNodeStatus == NodeStatus.Unlock)
          .filter(engine => {
            if (engine.getNodeResource.getUsedResource != null) {
              // 引擎资源只有满足需要的资源才复用
              engine.getNodeResource.getUsedResource.notLess(resource.getMaxResource)
            } else {
              // 引擎正在启动中,比较锁住的资源,最终是否复用沿用之前复用逻辑
              engine.getNodeResource.getLockedResource.notLess(resource.getMaxResource)
            }
          })
      }

      if (engineScoreList.isEmpty) {
        throw new LinkisRetryException(
          AMConstant.ENGINE_ERROR_CODE,
          s"No engine can be reused, cause all engine resources are not sufficient."
        )
      }
    }

    var engine: EngineNode = null
    var count = 1
    val timeout =
      if (engineReuseRequest.getTimeOut <= 0) {
        AMConfiguration.ENGINE_REUSE_MAX_TIME.getValue.toLong
      } else engineReuseRequest.getTimeOut
    val reuseLimit: Int =
      if (engineReuseRequest.getReuseCount <= 0) AMConfiguration.ENGINE_REUSE_COUNT_LIMIT
      else engineReuseRequest.getReuseCount

    def selectEngineToReuse: Boolean = {
      if (count > reuseLimit) {
        throw new LinkisRetryException(
          AMConstant.ENGINE_ERROR_CODE,
          s"Engine reuse exceeds limit: $reuseLimit"
        )
      }
      val choseNode = nodeSelector.choseNode(engineScoreList.toArray)
      if (choseNode.isEmpty) {
        throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, "No engine can be reused")
      }
      val engineNode = choseNode.get.asInstanceOf[EngineNode]
      logger.info(s"prepare to reuse engineNode: ${engineNode.getServiceInstance}")
      engine = Utils.tryCatch(getEngineNodeManager.reuseEngine(engineNode)) { t: Throwable =>
        logger.info(s"Failed to reuse engine ${engineNode.getServiceInstance}", t)
        if (ExceptionUtils.getRootCause(t).isInstanceOf[TimeoutException]) {
          logger.info(s"Failed to reuse ${engineNode.getServiceInstance}, now to stop this")
          val stopEngineRequest =
            new EngineStopRequest(engineNode.getServiceInstance, ManagerUtils.getAdminUser)
          engineStopService.asyncStopEngine(stopEngineRequest)
        }
        null
      }
      if (null == engine) {
        count = count + 1
        engineScoreList = engineScoreList.filter(!_.equals(choseNode.get))
      }
      null != engine
    }

    val startTime = System.currentTimeMillis()
    try {
      Utils.waitUntil(() => selectEngineToReuse, Duration(timeout, TimeUnit.MILLISECONDS))
    } catch {
      case e: TimeoutException =>
        throw new LinkisRetryException(
          AMConstant.ENGINE_ERROR_CODE,
          s"Waiting for Engine initialization failure, already waiting $timeout ms"
        )
      case t: Throwable =>
        logger.info(
          s"Failed to reuse engineConn time taken ${System.currentTimeMillis() - startTime}"
        )
        throw t
    }
    logger.info(
      s"Finished to reuse Engine for request: $engineReuseRequest get EngineNode $engine, time taken ${System
        .currentTimeMillis() - startTime}"
    )
    val engineServiceLabelList =
      instances.asScala.filter(kv => kv._1.getServiceInstance.equals(engine.getServiceInstance))
    if (null != engineServiceLabelList && engineServiceLabelList.nonEmpty) {
      engine.setLabels(engineServiceLabelList.head._2)
    } else {
      logger.info(
        "Get choosen engineNode : " + AMUtils.GSON
          .toJson(engine) + " from engineLabelMap : " + AMUtils.GSON.toJson(instances)
      )
    }
    engine
  }