in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/restful/RMMonitorRest.scala [368:517]
def getQueueResource(
request: HttpServletRequest,
@RequestBody param: util.Map[String, AnyRef]
): Message = {
ModuleUserUtils.getOperationUser(request, "getQueueResource")
val message = Message.ok("")
val yarnIdentifier = new YarnResourceIdentifier(param.get("queuename").asInstanceOf[String])
var clustername = param.get("clustername").asInstanceOf[String]
val crossCluster = java.lang.Boolean.parseBoolean(
param.getOrDefault("crossCluster", "false").asInstanceOf[String]
)
// For DSS increases cross cluster resource queries,when crossCluster is true clustername will become bdp
if (crossCluster) {
clustername = AMConfiguration.PRIORITY_CLUSTER_TARGET
}
val clusterLabel = labelFactory.createLabel(classOf[ClusterLabel])
clusterLabel.setClusterName(clustername)
clusterLabel.setClusterType(param.get("clustertype").asInstanceOf[String])
val labelContainer = new RMLabelContainer(Lists.newArrayList(clusterLabel))
val providedYarnResource =
externalResourceService.getResource(ResourceType.Yarn, labelContainer, yarnIdentifier)
var usedMemoryPercentage, usedCPUPercentage = 0.0
(
providedYarnResource.getMaxResource.asInstanceOf[YarnResource],
providedYarnResource.getUsedResource.asInstanceOf[YarnResource]
) match {
case (maxResource, usedResource) =>
val queueInfo = new mutable.HashMap[String, Any]()
queueInfo.put("queuename", maxResource)
queueInfo.put(
"maxResources",
Map("memory" -> maxResource.getQueueName, "cores" -> maxResource.getQueueCores)
)
queueInfo.put(
"usedResources",
Map("memory" -> usedResource.getQueueMemory, "cores" -> usedResource.getQueueCores)
)
usedMemoryPercentage = usedResource.getQueueMemory
.asInstanceOf[Double] / maxResource.getQueueMemory.asInstanceOf[Double]
usedCPUPercentage =
usedResource.getQueueCores.asInstanceOf[Double] / maxResource.getQueueCores
.asInstanceOf[Double]
queueInfo.put(
"usedPercentage",
Map("memory" -> usedMemoryPercentage, "cores" -> usedCPUPercentage)
)
queueInfo.put("maxApps", providedYarnResource.getMaxApps)
queueInfo.put("numActiveApps", providedYarnResource.getNumActiveApps)
queueInfo.put("numPendingApps", providedYarnResource.getNumPendingApps)
appendMessageData(message, "queueInfo", queueInfo)
case _ => Message.error("Failed to get queue resource")
}
val userResourceRecords = new ArrayBuffer[mutable.HashMap[String, Any]]()
val yarnAppsInfo =
externalResourceService.getAppInfo(ResourceType.Yarn, labelContainer, yarnIdentifier)
val userList =
yarnAppsInfo.asScala.groupBy(_.asInstanceOf[YarnAppInfo].getUser).keys.toList.asJava
Utils.tryCatch {
val nodesList = getEngineNodesByUserList(userList, true)
yarnAppsInfo.asScala.groupBy(_.asInstanceOf[YarnAppInfo].getUser).foreach { userAppInfo =>
var busyResource = Resource.initResource(ResourceType.Yarn).asInstanceOf[YarnResource]
var idleResource = Resource.initResource(ResourceType.Yarn).asInstanceOf[YarnResource]
val appIdToEngineNode = new mutable.HashMap[String, EngineNode]()
val nodesplus = nodesList.get(userAppInfo._1)
if (nodesplus.isDefined) {
nodesplus.get.foreach(node => {
if (node.getNodeResource != null && node.getNodeResource.getUsedResource != null) {
node.getNodeResource.getUsedResource match {
case driverYarn: DriverAndYarnResource
if driverYarn.getYarnResource.getQueueName
.equals(yarnIdentifier.getQueueName) =>
appIdToEngineNode.put(driverYarn.getYarnResource.getApplicationId, node)
case yarn: YarnResource if yarn.getQueueName.equals(yarnIdentifier.getQueueName) =>
appIdToEngineNode.put(yarn.getApplicationId, node)
case _ =>
}
}
})
}
userAppInfo._2.foreach { appInfo =>
appIdToEngineNode.get(appInfo.asInstanceOf[YarnAppInfo].getId) match {
case Some(node) =>
if (NodeStatus.Busy == node.getNodeStatus) {
busyResource = busyResource.add(appInfo.asInstanceOf[YarnAppInfo].getUsedResource)
} else {
idleResource = idleResource.add(appInfo.asInstanceOf[YarnAppInfo].getUsedResource)
}
case None =>
busyResource = busyResource.add(appInfo.asInstanceOf[YarnAppInfo].getUsedResource)
}
}
val totalResource = busyResource.add(idleResource)
if (totalResource.moreThan(Resource.getZeroResource(totalResource))) {
val userResource = new mutable.HashMap[String, Any]()
userResource.put("username", userAppInfo._1)
val queueResource = providedYarnResource.getMaxResource.asInstanceOf[YarnResource]
if (usedMemoryPercentage > usedCPUPercentage) {
userResource.put(
"busyPercentage",
busyResource.getQueueMemory.asInstanceOf[Double] / queueResource.getQueueMemory
.asInstanceOf[Double]
)
userResource.put(
"idlePercentage",
idleResource.getQueueMemory.asInstanceOf[Double] / queueResource.getQueueMemory
.asInstanceOf[Double]
)
userResource.put(
"totalPercentage",
totalResource.getQueueMemory.asInstanceOf[Double] / queueResource.getQueueMemory
.asInstanceOf[Double]
)
} else {
userResource.put(
"busyPercentage",
busyResource.getQueueCores.asInstanceOf[Double] / queueResource.getQueueCores
.asInstanceOf[Double]
)
userResource.put(
"idlePercentage",
idleResource.getQueueCores.asInstanceOf[Double] / queueResource.getQueueCores
.asInstanceOf[Double]
)
userResource.put(
"totalPercentage",
totalResource.getQueueCores.asInstanceOf[Double] / queueResource.getQueueCores
.asInstanceOf[Double]
)
}
userResourceRecords.asJava.add(userResource)
}
}
} {
case exception: Exception =>
logger.error(s"queresource search failed!", exception)
case _ =>
}
userResourceRecords.asJava.sort(new Comparator[mutable.Map[String, Any]]() {
override def compare(o1: mutable.Map[String, Any], o2: mutable.Map[String, Any]): Int = if (
o1.getOrElse("totalPercentage", 0.0)
.asInstanceOf[Double] > o2.getOrElse("totalPercentage", 0.0).asInstanceOf[Double]
) {
-1
} else 1
})
appendMessageData(message, "userResources", userResourceRecords)
}