in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala [149:281]
override def resetResource(serviceInstance: String, username: String): Unit = {
// ECM开关
if (AMConfiguration.AM_ECM_RESET_RESOURCE && StringUtils.isNotBlank(serviceInstance)) {
val filteredECMs = if (serviceInstance.equals("*")) {
getAllEM()
} else {
getAllEM().filter(_.getServiceInstance.getInstance.equals(serviceInstance))
}
// 遍历处理ECM
filteredECMs.foreach { ecmInstance =>
// lock ECMInstance && set unhealthy
logger.info(
MessageFormat.format(
s"ECM:{0} will be marked as unhealthy and locked",
ecmInstance.getServiceInstance.getInstance
)
)
val eMInstanceLabel = ecmInstance.getLabels.filter(_.isInstanceOf[EMInstanceLabel]).head
val lock =
resourceManager.tryLockOneLabel(eMInstanceLabel, -1, Utils.getJvmUser)
engineInfoService
.updateEngineHealthyStatus(ecmInstance.getServiceInstance, NodeHealthy.UnHealthy)
Utils.tryFinally {
// 获取ecm下所有node
val nodeResource =
engineInfoService.listEMEngines(ecmInstance).asScala.map(_.getNodeResource).toArray
// 收集所有node所使用的资源(汇总、已使用、上锁)
val (realSumResource, useResource, lockResource) =
collectResource(nodeResource, ResourceType.LoadInstance)
// 收集ECM资源
val ecmNodeResource = ecmInstance.getNodeResource
// 资源对比,资源重置
if (
(!(useResource.equalsTo(ecmNodeResource.getUsedResource)) || (!(lockResource
.equalsTo(ecmNodeResource.getLockedResource))))
) {
logger.info(
MessageFormat.format(
"ECM:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}",
ecmInstance.getServiceInstance.getInstance,
ecmNodeResource.getUsedResource.add(ecmNodeResource.getLockedResource),
realSumResource
)
)
ecmNodeResource.setLockedResource(lockResource)
ecmNodeResource.setLeftResource(ecmNodeResource.getMaxResource.minus(realSumResource))
ecmNodeResource.setUsedResource(useResource)
val persistence = ResourceUtils.toPersistenceResource(ecmNodeResource)
val resourceLabel = labelManagerPersistence.getLabelByResource(persistence)
resourceManager.resetResource(resourceLabel.head, ecmNodeResource)
}
} {
logger.info(
MessageFormat.format(
s"ECM:{0} will be marked as healthy and the lock will be released",
ecmInstance.getServiceInstance.getInstance
)
)
resourceManager.unLock(lock)
engineInfoService
.updateEngineHealthyStatus(ecmInstance.getServiceInstance, NodeHealthy.Healthy)
}
}
}
// 用户资源重置
if (AMConfiguration.AM_USER_RESET_RESOURCE && StringUtils.isNotBlank(username)) {
// 获取用户的标签
val user = if (username.equals("*")) {
""
} else {
username
}
val labelValuePattern =
MessageFormat.format("%{0}%,%{1}%,%{2}%,%", "", user, "")
val userLabels = labelManagerPersistence.getLabelByPattern(
labelValuePattern,
"combined_userCreator_engineType",
null,
null
)
// 获取与这些标签关联的资源
val userLabelResources = resourceManagerPersistence.getResourceByLabels(userLabels).asScala
// 遍历用户标签资源
userLabelResources.foreach { userLabelResource =>
val labelUser = LabelUtil.getFromLabelStr(userLabelResource.getCreator, "user")
val resourceLabel = labelManagerPersistence.getLabelByResource(userLabelResource)
resourceLabel.head.setStringValue(userLabelResource.getCreator)
// lock userCreatorEngineTypeLabel
val lock = resourceManager.tryLockOneLabel(resourceLabel.head, -1, labelUser)
Utils.tryFinally {
val userPersistenceResource = ResourceUtils.fromPersistenceResource(userLabelResource)
val userResourceType = ResourceType.valueOf(userLabelResource.getResourceType)
val userEngineNodes = nodeLabelService.getEngineNodesWithResourceByUser(labelUser, true)
val userEngineNodeFilter = userEngineNodes
.filter { node =>
val userCreatorLabelStr =
LabelUtil.getUserCreatorLabel(node.getLabels).getStringValue
val engineTypeLabelStr = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue
userLabelResource.getCreator.equalsIgnoreCase(
s"${userCreatorLabelStr},${engineTypeLabelStr}"
)
}
.map(_.getNodeResource)
// 收集所有node所使用的资源(汇总、已使用、上锁)
val (sumResource, uedResource, lockResource) =
collectResource(userEngineNodeFilter, userResourceType)
if (
(!(uedResource.equalsTo(userPersistenceResource.getUsedResource)) || (!(lockResource
.equalsTo(userPersistenceResource.getLockedResource))))
) {
logger.info(
MessageFormat.format(
"LabelUser:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}",
labelUser,
userPersistenceResource.getUsedResource
.add(userPersistenceResource.getLockedResource),
sumResource
)
)
userPersistenceResource.setLeftResource(
userPersistenceResource.getMaxResource.minus(sumResource)
)
userPersistenceResource.setUsedResource(uedResource)
userPersistenceResource.setLockedResource(lockResource)
resourceManager.resetResource(resourceLabel.head, userPersistenceResource)
}
} {
resourceManager.unLock(lock)
}
}
}
}