func()

in pkg/scheduler/context.go [631:699]


func (cc *ClusterContext) updateNode(nodeInfo *si.NodeInfo) {
	var partition *PartitionContext
	if p, ok := nodeInfo.Attributes[siCommon.NodePartition]; ok {
		partition = cc.GetPartition(p)
	} else {
		log.Log(log.SchedContext).Error("node partition not specified",
			zap.String("nodeID", nodeInfo.NodeID),
			zap.Stringer("nodeAction", nodeInfo.Action))
		return
	}

	if partition == nil {
		log.Log(log.SchedContext).Error("Failed to update node on non existing partition",
			zap.String("nodeID", nodeInfo.NodeID),
			zap.String("partitionName", nodeInfo.Attributes[siCommon.NodePartition]),
			zap.Stringer("nodeAction", nodeInfo.Action))
		return
	}

	node := partition.GetNode(nodeInfo.NodeID)
	if node == nil {
		log.Log(log.SchedContext).Error("Failed to update non existing node",
			zap.String("nodeID", nodeInfo.NodeID),
			zap.String("partitionName", nodeInfo.Attributes[siCommon.NodePartition]),
			zap.Stringer("nodeAction", nodeInfo.Action))
		return
	}

	switch nodeInfo.Action {
	case si.NodeInfo_UPDATE:
		if sr := nodeInfo.SchedulableResource; sr != nil {
			partition.updatePartitionResource(node.SetCapacity(resources.NewResourceFromProto(sr)))
		}
	case si.NodeInfo_DRAIN_NODE:
		if node.IsSchedulable() {
			// set the state to not schedulable
			node.SetSchedulable(false)
			metrics.GetSchedulerMetrics().IncDrainingNodes()
		}
	case si.NodeInfo_DRAIN_TO_SCHEDULABLE:
		if !node.IsSchedulable() {
			metrics.GetSchedulerMetrics().DecDrainingNodes()
			// set the state to schedulable
			node.SetSchedulable(true)
		}
	case si.NodeInfo_DECOMISSION:
		if !node.IsSchedulable() {
			metrics.GetSchedulerMetrics().DecDrainingNodes()
		}
		metrics.GetSchedulerMetrics().IncTotalDecommissionedNodes()
		// set the state to not schedulable then tell the partition to clean up
		node.SetSchedulable(false)
		released, confirmed := partition.removeNode(node.NodeID)
		node.SendNodeRemovedEvent()
		// notify the shim allocations have been released from node
		if len(released) != 0 {
			cc.notifyRMAllocationReleased(partition.RmID, partition.Name, released, si.TerminationType_STOPPED_BY_RM,
				fmt.Sprintf("Node %s Removed", node.NodeID))
		}
		for _, confirm := range confirmed {
			cc.notifyRMNewAllocation(partition.RmID, confirm)
		}
	default:
		log.Log(log.SchedContext).Debug("unknown action for node update",
			zap.String("nodeID", nodeInfo.NodeID),
			zap.String("partitionName", nodeInfo.Attributes[siCommon.NodePartition]),
			zap.Stringer("nodeAction", nodeInfo.Action))
	}
}