in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala [410:522]
override def resourceUsed(labels: util.List[Label[_]], usedResource: NodeResource): Unit = {
val labelContainer = labelResourceService.enrichLabels(labels)
if (null == labelContainer.getEngineInstanceLabel) {
throw new RMErrorException(
RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getErrorCode,
"engine instance label is null"
)
}
var lockedResource: NodeResource = null
var persistenceResource: PersistenceResource = null
try {
persistenceResource =
labelResourceService.getPersistenceResource(labelContainer.getEngineInstanceLabel)
lockedResource = ResourceUtils.fromPersistenceResource(persistenceResource)
} catch {
case e: NullPointerException =>
logger.error(
s"EngineInstanceLabel [${labelContainer.getEngineInstanceLabel}] cause NullPointerException"
)
throw e
}
val nodeInstance =
nodeManagerPersistence.getEngineNode(labelContainer.getEngineInstanceLabel.getServiceInstance)
if (nodeInstance == null) {
throw new RMErrorException(
RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getErrorCode,
s"No serviceInstance found by engine ${labelContainer.getEngineInstanceLabel}, current label resource ${lockedResource}"
)
}
if (
lockedResource == null || !lockedResource.getLockedResource.moreThan(
Resource.initResource(lockedResource.getResourceType)
)
) {
throw new RMErrorException(
RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getErrorCode,
s"No locked resource found by engine ${labelContainer.getEngineInstanceLabel}, current label resource ${lockedResource}"
)
}
logger.info(
s"resourceUsed ready:${labelContainer.getEMInstanceLabel.getServiceInstance}, used resource ${lockedResource.getLockedResource}"
)
val addedResource =
Resource.initResource(lockedResource.getResourceType).add(lockedResource.getLockedResource)
val engineInstanceLabel: EngineInstanceLabel = labelContainer.getEngineInstanceLabel
Utils.tryCatch {
lockedResource.setUsedResource(lockedResource.getLockedResource)
updateYarnApplicationID(usedResource, lockedResource)
lockedResource.setLockedResource(Resource.getZeroResource(lockedResource.getLockedResource))
labelResourceService.setLabelResource(
engineInstanceLabel,
lockedResource,
labelContainer.getCombinedResourceLabel.getStringValue
)
resourceLogService.success(
ChangeType.ENGINE_INIT,
lockedResource.getLockedResource,
engineInstanceLabel
)
} { case exception: Exception =>
logger.error(
s"${engineInstanceLabel.getStringValue} used resource failed!, resource: ${lockedResource}",
exception
)
}
val labelResourceSet = new mutable.HashSet[LabelResourceMapping]()
Utils.tryCatch {
labelContainer.getResourceLabels.asScala
.filter(!_.isInstanceOf[EngineInstanceLabel])
.foreach { label =>
val persistenceLock =
tryLockOneLabel(label, -1, labelContainer.getUserCreatorLabel.getUser)
Utils.tryFinally {
labelContainer.setCurrentLabel(label)
val labelResource = labelResourceService.getLabelResource(label)
if (labelResource != null) {
labelResource.setLockedResource(labelResource.getLockedResource.minus(addedResource))
if (null == labelResource.getUsedResource) {
labelResource.setUsedResource(Resource.initResource(labelResource.getResourceType))
}
labelResource.setUsedResource(labelResource.getUsedResource.add(addedResource))
labelResourceService.setLabelResource(
label,
labelResource,
labelContainer.getCombinedResourceLabel.getStringValue
)
labelResourceSet.add(
new LabelResourceMapping(label, addedResource, ResourceOperationType.USED)
)
resourceCheck(label, labelResource)
}
} {
resourceLockService.unLock(persistenceLock)
}
if (label.getClass.isAssignableFrom(labelContainer.getCombinedResourceLabel.getClass)) {
resourceLogService.recordUserResourceAction(
labelContainer,
persistenceResource.getTicketId,
ChangeType.ENGINE_INIT,
addedResource,
NodeStatus.Running
)
}
}
} { case exception: Exception =>
resourceRollback(labelResourceSet, labelContainer.getUserCreatorLabel.getUser)
logger.error(
s"${labelContainer.getEngineInstanceLabel.getStringValue} used resource failed!, resource: ${lockedResource}",
exception
)
}
}