in pkg/scheduler/context.go [631:699]
func (cc *ClusterContext) updateNode(nodeInfo *si.NodeInfo) {
var partition *PartitionContext
if p, ok := nodeInfo.Attributes[siCommon.NodePartition]; ok {
partition = cc.GetPartition(p)
} else {
log.Log(log.SchedContext).Error("node partition not specified",
zap.String("nodeID", nodeInfo.NodeID),
zap.Stringer("nodeAction", nodeInfo.Action))
return
}
if partition == nil {
log.Log(log.SchedContext).Error("Failed to update node on non existing partition",
zap.String("nodeID", nodeInfo.NodeID),
zap.String("partitionName", nodeInfo.Attributes[siCommon.NodePartition]),
zap.Stringer("nodeAction", nodeInfo.Action))
return
}
node := partition.GetNode(nodeInfo.NodeID)
if node == nil {
log.Log(log.SchedContext).Error("Failed to update non existing node",
zap.String("nodeID", nodeInfo.NodeID),
zap.String("partitionName", nodeInfo.Attributes[siCommon.NodePartition]),
zap.Stringer("nodeAction", nodeInfo.Action))
return
}
switch nodeInfo.Action {
case si.NodeInfo_UPDATE:
if sr := nodeInfo.SchedulableResource; sr != nil {
partition.updatePartitionResource(node.SetCapacity(resources.NewResourceFromProto(sr)))
}
case si.NodeInfo_DRAIN_NODE:
if node.IsSchedulable() {
// set the state to not schedulable
node.SetSchedulable(false)
metrics.GetSchedulerMetrics().IncDrainingNodes()
}
case si.NodeInfo_DRAIN_TO_SCHEDULABLE:
if !node.IsSchedulable() {
metrics.GetSchedulerMetrics().DecDrainingNodes()
// set the state to schedulable
node.SetSchedulable(true)
}
case si.NodeInfo_DECOMISSION:
if !node.IsSchedulable() {
metrics.GetSchedulerMetrics().DecDrainingNodes()
}
metrics.GetSchedulerMetrics().IncTotalDecommissionedNodes()
// set the state to not schedulable then tell the partition to clean up
node.SetSchedulable(false)
released, confirmed := partition.removeNode(node.NodeID)
node.SendNodeRemovedEvent()
// notify the shim allocations have been released from node
if len(released) != 0 {
cc.notifyRMAllocationReleased(partition.RmID, partition.Name, released, si.TerminationType_STOPPED_BY_RM,
fmt.Sprintf("Node %s Removed", node.NodeID))
}
for _, confirm := range confirmed {
cc.notifyRMNewAllocation(partition.RmID, confirm)
}
default:
log.Log(log.SchedContext).Debug("unknown action for node update",
zap.String("nodeID", nodeInfo.NodeID),
zap.String("partitionName", nodeInfo.Attributes[siCommon.NodePartition]),
zap.Stringer("nodeAction", nodeInfo.Action))
}
}