in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.scala [651:791]
override def resourceReleased(ecNode: EngineNode): Unit = {
val labelContainer = labelResourceService.enrichLabels(ecNode.getLabels)
if (null == labelContainer.getEngineInstanceLabel) {
throw new RMErrorException(
RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getErrorCode,
"engine instance label is null"
)
}
val instanceLock = tryLockOneLabel(
labelContainer.getEngineInstanceLabel,
RMUtils.RM_RESOURCE_LOCK_WAIT_TIME.getValue,
labelContainer.getUserCreatorLabel.getUser
)
Utils.tryFinally {
val persistenceResource: PersistenceResource =
labelResourceService.getPersistenceResource(labelContainer.getEngineInstanceLabel)
val usedResource = ResourceUtils.fromPersistenceResource(persistenceResource)
if (usedResource == null) {
throw new RMErrorException(
RMErrorCode.LABEL_RESOURCE_NOT_FOUND.getErrorCode,
s"No used resource found by engine ${labelContainer.getEngineInstanceLabel}"
)
}
logger.info(
s"resourceRelease ready:${labelContainer.getEngineInstanceLabel.getServiceInstance},current node resource${usedResource}"
)
val status = if (null == ecNode.getNodeStatus) {
getNodeStatus(labelContainer.getEngineInstanceLabel)
} else {
ecNode.getNodeStatus
}
labelContainer.getResourceLabels.asScala
.filter(!_.isInstanceOf[EngineInstanceLabel])
.foreach { label =>
Utils.tryCatch {
val persistenceLock = tryLockOneLabel(
label,
RMUtils.RM_RESOURCE_LOCK_WAIT_TIME.getValue,
labelContainer.getUserCreatorLabel.getUser
)
Utils.tryFinally {
val labelResource = labelResourceService.getLabelResource(label)
if (labelResource != null) {
if (label.isInstanceOf[EMInstanceLabel]) timeCheck(labelResource, usedResource)
if (
null != usedResource.getUsedResource && usedResource.getUsedResource.moreThan(
Resource
.initResource(usedResource.getResourceType)
)
) {
labelResource.setUsedResource(
labelResource.getUsedResource.minus(usedResource.getUsedResource)
)
labelResource.setLeftResource(
labelResource.getLeftResource.add(usedResource.getUsedResource)
)
}
if (
null != usedResource.getLockedResource && usedResource.getLockedResource
.moreThan(
Resource
.initResource(usedResource.getResourceType)
)
) {
labelResource.setLockedResource(
labelResource.getLockedResource.minus(usedResource.getLockedResource)
)
labelResource.setLeftResource(
labelResource.getLeftResource.add(usedResource.getLockedResource)
)
}
labelResourceService.setLabelResource(
label,
labelResource,
labelContainer.getCombinedResourceLabel.getStringValue
)
resourceCheck(label, labelResource)
}
} {
resourceLockService.unLock(persistenceLock)
}
val releasedResource = if (usedResource.getUsedResource != null) {
usedResource.getUsedResource
} else {
usedResource.getLockedResource
}
var heartbeatMsgMetrics = ""
Utils.tryAndWarn {
val oldMetrics = nodeMetricManagerPersistence.getNodeMetrics(ecNode)
if (oldMetrics != null && StringUtils.isNotBlank(oldMetrics.getHeartBeatMsg)) {
heartbeatMsgMetrics = oldMetrics.getHeartBeatMsg
}
}
if (label.getClass.isAssignableFrom(labelContainer.getCombinedResourceLabel.getClass)) {
resourceLogService.recordUserResourceAction(
labelContainer,
persistenceResource.getTicketId,
ChangeType.ENGINE_CLEAR,
releasedResource,
status,
heartbeatMsgMetrics
)
}
} { case exception: Exception =>
logger.error(
s"Failed to release resource label ${labelContainer.getEngineInstanceLabel.getStringValue}",
exception
)
}
}
val engineInstanceLabel = labelContainer.getEngineInstanceLabel
Utils.tryCatch {
labelResourceService.removeResourceByLabel(engineInstanceLabel)
resourceLogService.success(
ChangeType.ENGINE_CLEAR,
usedResource.getUsedResource,
engineInstanceLabel,
null
)
} {
case exception: Exception =>
resourceLogService.failed(
ChangeType.ENGINE_CLEAR,
usedResource.getUsedResource,
engineInstanceLabel,
null,
exception
)
throw exception
case _ =>
}
} {
logger.info(
s"Finished release instance ${labelContainer.getEngineInstanceLabel.getServiceInstance} resource"
)
resourceLockService.unLock(instanceLock)
}
}