in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala [89:268]
override def reuseEngine(engineReuseRequest: EngineReuseRequest, sender: Sender): EngineNode = {
val taskId = JobUtils.getJobIdFromStringMap(engineReuseRequest.getProperties)
logger.info(s"Task $taskId Start to reuse Engine for request: $engineReuseRequest")
val labelBuilderFactory = LabelBuilderFactoryContext.getLabelBuilderFactory
val labelList = LabelUtils
.distinctLabel(
labelBuilderFactory.getLabels(engineReuseRequest.getLabels),
userLabelService.getUserLabels(engineReuseRequest.getUser)
)
.asScala
val exclusionInstances: Array[String] =
labelList.find(_.isInstanceOf[ReuseExclusionLabel]) match {
case Some(l) =>
l.asInstanceOf[ReuseExclusionLabel].getInstances
case None =>
Array.empty[String]
}
if (
exclusionInstances.length == 1 && exclusionInstances(
0
) == GovernanceCommonConf.WILDCARD_CONSTANT
) {
logger.info(
s"Task $taskId exists ReuseExclusionLabel and the configuration does not choose to reuse EC"
)
return null
}
var filterLabelList = labelList.filter(_.isInstanceOf[EngineNodeLabel]).asJava
val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])
engineConnAliasLabel.setAlias(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
filterLabelList.add(engineConnAliasLabel)
// label chooser
if (null != engineReuseLabelChoosers) {
engineReuseLabelChoosers.asScala.foreach { chooser =>
filterLabelList = chooser.chooseLabels(filterLabelList)
}
}
val instances = nodeLabelService.getScoredNodeMapsByLabels(filterLabelList)
if (null != instances && null != exclusionInstances && exclusionInstances.nonEmpty) {
val instancesKeys = instances.asScala.keys.toArray
instancesKeys
.filter { instance =>
exclusionInstances.exists(_.equalsIgnoreCase(instance.getServiceInstance.getInstance))
}
.foreach { instance =>
logger.info(
s"will be not reuse ${instance.getServiceInstance}, cause use exclusion label"
)
instances.remove(instance)
}
}
if (null == instances || instances.isEmpty) {
throw new LinkisRetryException(
AMConstant.ENGINE_ERROR_CODE,
s"No engine can be reused, cause from db is null"
)
}
var engineScoreList =
getEngineNodeManager.getEngineNodes(instances.asScala.keys.toSeq.toArray)
// 获取需要的资源
if (AMConfiguration.EC_REUSE_WITH_RESOURCE_RULE_ENABLE) {
val labels: util.List[Label[_]] =
engineCreateService.buildLabel(engineReuseRequest.getLabels, engineReuseRequest.getUser)
if (engineReuseRequest.getProperties == null) {
engineReuseRequest.setProperties(new util.HashMap[String, String]())
}
val engineType: String = LabelUtil.getEngineType(labels)
if (
StringUtils.isNotBlank(engineType) && AMConfiguration.EC_REUSE_WITH_RESOURCE_WITH_ECS
.contains(engineType.toLowerCase())
) {
val resource = engineCreateService.generateResource(
engineReuseRequest.getProperties,
engineReuseRequest.getUser,
labelFilter.choseEngineLabel(labels),
AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
)
// 过滤掉资源不满足的引擎
engineScoreList = engineScoreList
.filter(engine => engine.getNodeStatus == NodeStatus.Unlock)
.filter(engine => {
if (engine.getNodeResource.getUsedResource != null) {
// 引擎资源只有满足需要的资源才复用
engine.getNodeResource.getUsedResource.notLess(resource.getMaxResource)
} else {
// 引擎正在启动中,比较锁住的资源,最终是否复用沿用之前复用逻辑
engine.getNodeResource.getLockedResource.notLess(resource.getMaxResource)
}
})
}
if (engineScoreList.isEmpty) {
throw new LinkisRetryException(
AMConstant.ENGINE_ERROR_CODE,
s"No engine can be reused, cause all engine resources are not sufficient."
)
}
}
var engine: EngineNode = null
var count = 1
val timeout =
if (engineReuseRequest.getTimeOut <= 0) {
AMConfiguration.ENGINE_REUSE_MAX_TIME.getValue.toLong
} else engineReuseRequest.getTimeOut
val reuseLimit: Int =
if (engineReuseRequest.getReuseCount <= 0) AMConfiguration.ENGINE_REUSE_COUNT_LIMIT
else engineReuseRequest.getReuseCount
def selectEngineToReuse: Boolean = {
if (count > reuseLimit) {
throw new LinkisRetryException(
AMConstant.ENGINE_ERROR_CODE,
s"Engine reuse exceeds limit: $reuseLimit"
)
}
val choseNode = nodeSelector.choseNode(engineScoreList.toArray)
if (choseNode.isEmpty) {
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, "No engine can be reused")
}
val engineNode = choseNode.get.asInstanceOf[EngineNode]
logger.info(s"prepare to reuse engineNode: ${engineNode.getServiceInstance}")
engine = Utils.tryCatch(getEngineNodeManager.reuseEngine(engineNode)) { t: Throwable =>
logger.info(s"Failed to reuse engine ${engineNode.getServiceInstance}", t)
if (ExceptionUtils.getRootCause(t).isInstanceOf[TimeoutException]) {
logger.info(s"Failed to reuse ${engineNode.getServiceInstance}, now to stop this")
val stopEngineRequest =
new EngineStopRequest(engineNode.getServiceInstance, ManagerUtils.getAdminUser)
engineStopService.asyncStopEngine(stopEngineRequest)
}
null
}
if (null == engine) {
count = count + 1
engineScoreList = engineScoreList.filter(!_.equals(choseNode.get))
}
null != engine
}
val startTime = System.currentTimeMillis()
try {
Utils.waitUntil(() => selectEngineToReuse, Duration(timeout, TimeUnit.MILLISECONDS))
} catch {
case e: TimeoutException =>
throw new LinkisRetryException(
AMConstant.ENGINE_ERROR_CODE,
s"Waiting for Engine initialization failure, already waiting $timeout ms"
)
case t: Throwable =>
logger.info(
s"Failed to reuse engineConn time taken ${System.currentTimeMillis() - startTime}"
)
throw t
}
logger.info(
s"Finished to reuse Engine for request: $engineReuseRequest get EngineNode $engine, time taken ${System
.currentTimeMillis() - startTime}"
)
val engineServiceLabelList =
instances.asScala.filter(kv => kv._1.getServiceInstance.equals(engine.getServiceInstance))
if (null != engineServiceLabelList && engineServiceLabelList.nonEmpty) {
engine.setLabels(engineServiceLabelList.head._2)
} else {
logger.info(
"Get choosen engineNode : " + AMUtils.GSON
.toJson(engine) + " from engineLabelMap : " + AMUtils.GSON.toJson(instances)
)
}
engine
}