def getQueueResource()

in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala [368:517]


  def getQueueResource(
      request: HttpServletRequest,
      @RequestBody param: util.Map[String, AnyRef]
  ): Message = {
    ModuleUserUtils.getOperationUser(request, "getQueueResource")
    val message = Message.ok("")
    val yarnIdentifier = new YarnResourceIdentifier(param.get("queuename").asInstanceOf[String])
    var clustername = param.get("clustername").asInstanceOf[String]
    val crossCluster = java.lang.Boolean.parseBoolean(
      param.getOrDefault("crossCluster", "false").asInstanceOf[String]
    )
    // For DSS increases cross cluster resource queries,when crossCluster is true clustername will become bdp
    if (crossCluster) {
      clustername = AMConfiguration.PRIORITY_CLUSTER_TARGET
    }
    val clusterLabel = labelFactory.createLabel(classOf[ClusterLabel])
    clusterLabel.setClusterName(clustername)
    clusterLabel.setClusterType(param.get("clustertype").asInstanceOf[String])
    val labelContainer = new RMLabelContainer(Lists.newArrayList(clusterLabel))
    val providedYarnResource =
      externalResourceService.getResource(ResourceType.Yarn, labelContainer, yarnIdentifier)
    var usedMemoryPercentage, usedCPUPercentage = 0.0
    (
      providedYarnResource.getMaxResource.asInstanceOf[YarnResource],
      providedYarnResource.getUsedResource.asInstanceOf[YarnResource]
    ) match {
      case (maxResource, usedResource) =>
        val queueInfo = new mutable.HashMap[String, Any]()
        queueInfo.put("queuename", maxResource)
        queueInfo.put(
          "maxResources",
          Map("memory" -> maxResource.getQueueName, "cores" -> maxResource.getQueueCores)
        )
        queueInfo.put(
          "usedResources",
          Map("memory" -> usedResource.getQueueMemory, "cores" -> usedResource.getQueueCores)
        )
        usedMemoryPercentage = usedResource.getQueueMemory
          .asInstanceOf[Double] / maxResource.getQueueMemory.asInstanceOf[Double]
        usedCPUPercentage =
          usedResource.getQueueCores.asInstanceOf[Double] / maxResource.getQueueCores
            .asInstanceOf[Double]
        queueInfo.put(
          "usedPercentage",
          Map("memory" -> usedMemoryPercentage, "cores" -> usedCPUPercentage)
        )
        queueInfo.put("maxApps", providedYarnResource.getMaxApps)
        queueInfo.put("numActiveApps", providedYarnResource.getNumActiveApps)
        queueInfo.put("numPendingApps", providedYarnResource.getNumPendingApps)
        appendMessageData(message, "queueInfo", queueInfo)
      case _ => Message.error("Failed to get queue resource")
    }

    val userResourceRecords = new ArrayBuffer[mutable.HashMap[String, Any]]()
    val yarnAppsInfo =
      externalResourceService.getAppInfo(ResourceType.Yarn, labelContainer, yarnIdentifier)
    val userList =
      yarnAppsInfo.asScala.groupBy(_.asInstanceOf[YarnAppInfo].getUser).keys.toList.asJava
    Utils.tryCatch {
      val nodesList = getEngineNodesByUserList(userList, true)
      yarnAppsInfo.asScala.groupBy(_.asInstanceOf[YarnAppInfo].getUser).foreach { userAppInfo =>
        var busyResource = Resource.initResource(ResourceType.Yarn).asInstanceOf[YarnResource]
        var idleResource = Resource.initResource(ResourceType.Yarn).asInstanceOf[YarnResource]
        val appIdToEngineNode = new mutable.HashMap[String, EngineNode]()
        val nodesplus = nodesList.get(userAppInfo._1)
        if (nodesplus.isDefined) {
          nodesplus.get.foreach(node => {
            if (node.getNodeResource != null && node.getNodeResource.getUsedResource != null) {
              node.getNodeResource.getUsedResource match {
                case driverYarn: DriverAndYarnResource
                    if driverYarn.getYarnResource.getQueueName
                      .equals(yarnIdentifier.getQueueName) =>
                  appIdToEngineNode.put(driverYarn.getYarnResource.getApplicationId, node)
                case yarn: YarnResource if yarn.getQueueName.equals(yarnIdentifier.getQueueName) =>
                  appIdToEngineNode.put(yarn.getApplicationId, node)
                case _ =>
              }
            }
          })
        }
        userAppInfo._2.foreach { appInfo =>
          appIdToEngineNode.get(appInfo.asInstanceOf[YarnAppInfo].getId) match {
            case Some(node) =>
              if (NodeStatus.Busy == node.getNodeStatus) {
                busyResource = busyResource.add(appInfo.asInstanceOf[YarnAppInfo].getUsedResource)
              } else {
                idleResource = idleResource.add(appInfo.asInstanceOf[YarnAppInfo].getUsedResource)
              }
            case None =>
              busyResource = busyResource.add(appInfo.asInstanceOf[YarnAppInfo].getUsedResource)
          }
        }

        val totalResource = busyResource.add(idleResource)
        if (totalResource.moreThan(Resource.getZeroResource(totalResource))) {
          val userResource = new mutable.HashMap[String, Any]()
          userResource.put("username", userAppInfo._1)
          val queueResource = providedYarnResource.getMaxResource.asInstanceOf[YarnResource]
          if (usedMemoryPercentage > usedCPUPercentage) {
            userResource.put(
              "busyPercentage",
              busyResource.getQueueMemory.asInstanceOf[Double] / queueResource.getQueueMemory
                .asInstanceOf[Double]
            )
            userResource.put(
              "idlePercentage",
              idleResource.getQueueMemory.asInstanceOf[Double] / queueResource.getQueueMemory
                .asInstanceOf[Double]
            )
            userResource.put(
              "totalPercentage",
              totalResource.getQueueMemory.asInstanceOf[Double] / queueResource.getQueueMemory
                .asInstanceOf[Double]
            )
          } else {
            userResource.put(
              "busyPercentage",
              busyResource.getQueueCores.asInstanceOf[Double] / queueResource.getQueueCores
                .asInstanceOf[Double]
            )
            userResource.put(
              "idlePercentage",
              idleResource.getQueueCores.asInstanceOf[Double] / queueResource.getQueueCores
                .asInstanceOf[Double]
            )
            userResource.put(
              "totalPercentage",
              totalResource.getQueueCores.asInstanceOf[Double] / queueResource.getQueueCores
                .asInstanceOf[Double]
            )
          }
          userResourceRecords.asJava.add(userResource)
        }
      }
    } {
      case exception: Exception =>
        logger.error(s"queresource search failed!", exception)
      case _ =>
    }

    userResourceRecords.asJava.sort(new Comparator[mutable.Map[String, Any]]() {
      override def compare(o1: mutable.Map[String, Any], o2: mutable.Map[String, Any]): Int = if (
          o1.getOrElse("totalPercentage", 0.0)
            .asInstanceOf[Double] > o2.getOrElse("totalPercentage", 0.0).asInstanceOf[Double]
      ) {
        -1
      } else 1
    })
    appendMessageData(message, "userResources", userResourceRecords)
  }