override def resetResource()

in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/em/DefaultEMInfoService.scala [149:281]


  override def resetResource(serviceInstance: String, username: String): Unit = {
    // ECM开关
    if (AMConfiguration.AM_ECM_RESET_RESOURCE && StringUtils.isNotBlank(serviceInstance)) {
      val filteredECMs = if (serviceInstance.equals("*")) {
        getAllEM()
      } else {
        getAllEM().filter(_.getServiceInstance.getInstance.equals(serviceInstance))
      }
      // 遍历处理ECM
      filteredECMs.foreach { ecmInstance =>
        // lock ECMInstance && set unhealthy
        logger.info(
          MessageFormat.format(
            s"ECM:{0} will be marked as unhealthy and locked",
            ecmInstance.getServiceInstance.getInstance
          )
        )
        val eMInstanceLabel = ecmInstance.getLabels.filter(_.isInstanceOf[EMInstanceLabel]).head
        val lock =
          resourceManager.tryLockOneLabel(eMInstanceLabel, -1, Utils.getJvmUser)
        engineInfoService
          .updateEngineHealthyStatus(ecmInstance.getServiceInstance, NodeHealthy.UnHealthy)
        Utils.tryFinally {
          // 获取ecm下所有node
          val nodeResource =
            engineInfoService.listEMEngines(ecmInstance).asScala.map(_.getNodeResource).toArray
          // 收集所有node所使用的资源(汇总、已使用、上锁)
          val (realSumResource, useResource, lockResource) =
            collectResource(nodeResource, ResourceType.LoadInstance)
          // 收集ECM资源
          val ecmNodeResource = ecmInstance.getNodeResource
          // 资源对比,资源重置
          if (
              (!(useResource.equalsTo(ecmNodeResource.getUsedResource)) || (!(lockResource
                .equalsTo(ecmNodeResource.getLockedResource))))
          ) {
            logger.info(
              MessageFormat.format(
                "ECM:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}",
                ecmInstance.getServiceInstance.getInstance,
                ecmNodeResource.getUsedResource.add(ecmNodeResource.getLockedResource),
                realSumResource
              )
            )
            ecmNodeResource.setLockedResource(lockResource)
            ecmNodeResource.setLeftResource(ecmNodeResource.getMaxResource.minus(realSumResource))
            ecmNodeResource.setUsedResource(useResource)
            val persistence = ResourceUtils.toPersistenceResource(ecmNodeResource)
            val resourceLabel = labelManagerPersistence.getLabelByResource(persistence)
            resourceManager.resetResource(resourceLabel.head, ecmNodeResource)
          }
        } {
          logger.info(
            MessageFormat.format(
              s"ECM:{0} will be marked as healthy and the lock will be released",
              ecmInstance.getServiceInstance.getInstance
            )
          )
          resourceManager.unLock(lock)
          engineInfoService
            .updateEngineHealthyStatus(ecmInstance.getServiceInstance, NodeHealthy.Healthy)
        }
      }
    }

    // 用户资源重置
    if (AMConfiguration.AM_USER_RESET_RESOURCE && StringUtils.isNotBlank(username)) {
      // 获取用户的标签
      val user = if (username.equals("*")) {
        ""
      } else {
        username
      }
      val labelValuePattern =
        MessageFormat.format("%{0}%,%{1}%,%{2}%,%", "", user, "")
      val userLabels = labelManagerPersistence.getLabelByPattern(
        labelValuePattern,
        "combined_userCreator_engineType",
        null,
        null
      )
      // 获取与这些标签关联的资源
      val userLabelResources = resourceManagerPersistence.getResourceByLabels(userLabels).asScala
      // 遍历用户标签资源
      userLabelResources.foreach { userLabelResource =>
        val labelUser = LabelUtil.getFromLabelStr(userLabelResource.getCreator, "user")
        val resourceLabel = labelManagerPersistence.getLabelByResource(userLabelResource)
        resourceLabel.head.setStringValue(userLabelResource.getCreator)
        // lock userCreatorEngineTypeLabel
        val lock = resourceManager.tryLockOneLabel(resourceLabel.head, -1, labelUser)
        Utils.tryFinally {
          val userPersistenceResource = ResourceUtils.fromPersistenceResource(userLabelResource)
          val userResourceType = ResourceType.valueOf(userLabelResource.getResourceType)
          val userEngineNodes = nodeLabelService.getEngineNodesWithResourceByUser(labelUser, true)
          val userEngineNodeFilter = userEngineNodes
            .filter { node =>
              val userCreatorLabelStr =
                LabelUtil.getUserCreatorLabel(node.getLabels).getStringValue
              val engineTypeLabelStr = LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue
              userLabelResource.getCreator.equalsIgnoreCase(
                s"${userCreatorLabelStr},${engineTypeLabelStr}"
              )
            }
            .map(_.getNodeResource)
          // 收集所有node所使用的资源(汇总、已使用、上锁)
          val (sumResource, uedResource, lockResource) =
            collectResource(userEngineNodeFilter, userResourceType)
          if (
              (!(uedResource.equalsTo(userPersistenceResource.getUsedResource)) || (!(lockResource
                .equalsTo(userPersistenceResource.getLockedResource))))
          ) {
            logger.info(
              MessageFormat.format(
                "LabelUser:{0} resources will be reset, Record Resources:{1} ,Real Resources:{2}",
                labelUser,
                userPersistenceResource.getUsedResource
                  .add(userPersistenceResource.getLockedResource),
                sumResource
              )
            )
            userPersistenceResource.setLeftResource(
              userPersistenceResource.getMaxResource.minus(sumResource)
            )
            userPersistenceResource.setUsedResource(uedResource)
            userPersistenceResource.setLockedResource(lockResource)
            resourceManager.resetResource(resourceLabel.head, userPersistenceResource)
          }
        } {
          resourceManager.unLock(lock)
        }
      }
    }
  }