cluster-autoscaler/simulator/clustersnapshot/store/delta.go (361 lines of code) (raw):
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"fmt"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
// DeltaSnapshotStore is an implementation of ClusterSnapshotStore optimized for typical Cluster Autoscaler usage - (fork, add stuff, revert), repeated many times per loop.
//
// Complexity of some notable operations:
//
// fork - O(1)
// revert - O(1)
// commit - O(n)
// list all pods (no filtering) - O(n), cached
// list all pods (with filtering) - O(n)
// list node infos - O(n), cached
//
// Watch out for:
//
// node deletions, pod additions & deletions - invalidates cache of current snapshot
// (when forked affects delta, but not base.)
// pod affinity - causes scheduler framework to list pods with non-empty selector,
// so basic caching doesn't help.
type DeltaSnapshotStore struct {
data *internalDeltaSnapshotData
}
type deltaSnapshotStoreNodeLister DeltaSnapshotStore
type deltaSnapshotStoreStorageLister DeltaSnapshotStore
type internalDeltaSnapshotData struct {
baseData *internalDeltaSnapshotData
addedNodeInfoMap map[string]*schedulerframework.NodeInfo
modifiedNodeInfoMap map[string]*schedulerframework.NodeInfo
deletedNodeInfos map[string]bool
nodeInfoList []*schedulerframework.NodeInfo
havePodsWithAffinity []*schedulerframework.NodeInfo
havePodsWithRequiredAntiAffinity []*schedulerframework.NodeInfo
pvcNamespaceMap map[string]int
}
func newInternalDeltaSnapshotData() *internalDeltaSnapshotData {
return &internalDeltaSnapshotData{
addedNodeInfoMap: make(map[string]*schedulerframework.NodeInfo),
modifiedNodeInfoMap: make(map[string]*schedulerframework.NodeInfo),
deletedNodeInfos: make(map[string]bool),
}
}
func (data *internalDeltaSnapshotData) getNodeInfo(name string) (*schedulerframework.NodeInfo, bool) {
if data == nil {
return nil, false
}
if nodeInfo, found := data.getNodeInfoLocal(name); found {
return nodeInfo, found
}
if data.deletedNodeInfos[name] {
return nil, false
}
return data.baseData.getNodeInfo(name)
}
func (data *internalDeltaSnapshotData) getNodeInfoLocal(name string) (*schedulerframework.NodeInfo, bool) {
if data == nil {
return nil, false
}
if nodeInfo, found := data.addedNodeInfoMap[name]; found {
return nodeInfo, true
}
if nodeInfo, found := data.modifiedNodeInfoMap[name]; found {
return nodeInfo, true
}
return nil, false
}
func (data *internalDeltaSnapshotData) getNodeInfoList() []*schedulerframework.NodeInfo {
if data == nil {
return nil
}
if data.nodeInfoList == nil {
data.nodeInfoList = data.buildNodeInfoList()
}
return data.nodeInfoList
}
// Contains costly copying throughout the struct chain. Use wisely.
func (data *internalDeltaSnapshotData) buildNodeInfoList() []*schedulerframework.NodeInfo {
baseList := data.baseData.getNodeInfoList()
totalLen := len(baseList) + len(data.addedNodeInfoMap)
var nodeInfoList []*schedulerframework.NodeInfo
if len(data.deletedNodeInfos) > 0 || len(data.modifiedNodeInfoMap) > 0 {
nodeInfoList = make([]*schedulerframework.NodeInfo, 0, totalLen)
for _, bni := range baseList {
if data.deletedNodeInfos[bni.Node().Name] {
continue
}
if mni, found := data.modifiedNodeInfoMap[bni.Node().Name]; found {
nodeInfoList = append(nodeInfoList, mni)
continue
}
nodeInfoList = append(nodeInfoList, bni)
}
} else {
nodeInfoList = make([]*schedulerframework.NodeInfo, len(baseList), totalLen)
copy(nodeInfoList, baseList)
}
for _, ani := range data.addedNodeInfoMap {
nodeInfoList = append(nodeInfoList, ani)
}
return nodeInfoList
}
func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) error {
nodeInfo := schedulerframework.NewNodeInfo()
nodeInfo.SetNode(node)
return data.addNodeInfo(nodeInfo)
}
func (data *internalDeltaSnapshotData) addNodeInfo(nodeInfo *schedulerframework.NodeInfo) error {
if _, found := data.getNodeInfo(nodeInfo.Node().Name); found {
return fmt.Errorf("node %s already in snapshot", nodeInfo.Node().Name)
}
if _, found := data.deletedNodeInfos[nodeInfo.Node().Name]; found {
delete(data.deletedNodeInfos, nodeInfo.Node().Name)
data.modifiedNodeInfoMap[nodeInfo.Node().Name] = nodeInfo
} else {
data.addedNodeInfoMap[nodeInfo.Node().Name] = nodeInfo
}
if data.nodeInfoList != nil {
data.nodeInfoList = append(data.nodeInfoList, nodeInfo)
}
if len(nodeInfo.Pods) > 0 {
data.clearPodCaches()
}
return nil
}
func (data *internalDeltaSnapshotData) clearCaches() {
data.nodeInfoList = nil
data.clearPodCaches()
}
func (data *internalDeltaSnapshotData) clearPodCaches() {
data.havePodsWithAffinity = nil
data.havePodsWithRequiredAntiAffinity = nil
// TODO: update the cache when adding/removing pods instead of invalidating the whole cache
data.pvcNamespaceMap = nil
}
func (data *internalDeltaSnapshotData) removeNodeInfo(nodeName string) error {
_, foundInDelta := data.addedNodeInfoMap[nodeName]
if foundInDelta {
// If node was added within this delta, delete this change.
delete(data.addedNodeInfoMap, nodeName)
}
if _, modified := data.modifiedNodeInfoMap[nodeName]; modified {
// If node was modified within this delta, delete this change.
delete(data.modifiedNodeInfoMap, nodeName)
}
if _, deleted := data.deletedNodeInfos[nodeName]; deleted {
// If node was deleted within this delta, fail with error.
return clustersnapshot.ErrNodeNotFound
}
_, foundInBase := data.baseData.getNodeInfo(nodeName)
if foundInBase {
// If node was found in the underlying data, mark it as deleted in delta.
data.deletedNodeInfos[nodeName] = true
}
if !foundInBase && !foundInDelta {
// Node not found in the chain.
return clustersnapshot.ErrNodeNotFound
}
// Maybe consider deleting from the lists instead. Maybe not.
data.clearCaches()
return nil
}
func (data *internalDeltaSnapshotData) nodeInfoToModify(nodeName string) (*schedulerframework.NodeInfo, bool) {
dni, found := data.getNodeInfoLocal(nodeName)
if !found {
if _, found := data.deletedNodeInfos[nodeName]; found {
return nil, false
}
bni, found := data.baseData.getNodeInfo(nodeName)
if !found {
return nil, false
}
dni = bni.Snapshot()
data.modifiedNodeInfoMap[nodeName] = dni
data.clearCaches()
}
return dni, true
}
func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) error {
ni, found := data.nodeInfoToModify(nodeName)
if !found {
return clustersnapshot.ErrNodeNotFound
}
ni.AddPod(pod)
// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
return nil
}
func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error {
// This always clones node info, even if the pod is actually missing.
// Not sure if we mind, since removing non-existent pod
// probably means things are very bad anyway.
ni, found := data.nodeInfoToModify(nodeName)
if !found {
return clustersnapshot.ErrNodeNotFound
}
podFound := false
logger := klog.Background()
for _, podInfo := range ni.Pods {
if podInfo.Pod.Namespace == namespace && podInfo.Pod.Name == name {
if err := ni.RemovePod(logger, podInfo.Pod); err != nil {
return fmt.Errorf("cannot remove pod; %v", err)
}
podFound = true
break
}
}
if !podFound {
return fmt.Errorf("pod %s/%s not in snapshot", namespace, name)
}
// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
return nil
}
func (data *internalDeltaSnapshotData) isPVCUsedByPods(key string) bool {
if data.pvcNamespaceMap != nil {
return data.pvcNamespaceMap[key] > 0
}
nodeInfos := data.getNodeInfoList()
pvcNamespaceMap := make(map[string]int)
for _, v := range nodeInfos {
for k, i := range v.PVCRefCounts {
pvcNamespaceMap[k] += i
}
}
data.pvcNamespaceMap = pvcNamespaceMap
return data.pvcNamespaceMap[key] > 0
}
func (data *internalDeltaSnapshotData) fork() *internalDeltaSnapshotData {
forkedData := newInternalDeltaSnapshotData()
forkedData.baseData = data
return forkedData
}
func (data *internalDeltaSnapshotData) commit() (*internalDeltaSnapshotData, error) {
if data.baseData == nil {
// do nothing... as in basic snapshot.
return data, nil
}
for node := range data.deletedNodeInfos {
if err := data.baseData.removeNodeInfo(node); err != nil {
return nil, err
}
}
for _, node := range data.modifiedNodeInfoMap {
if err := data.baseData.removeNodeInfo(node.Node().Name); err != nil {
return nil, err
}
if err := data.baseData.addNodeInfo(node); err != nil {
return nil, err
}
}
for _, node := range data.addedNodeInfoMap {
if err := data.baseData.addNodeInfo(node); err != nil {
return nil, err
}
}
return data.baseData, nil
}
// List returns list of all node infos.
func (snapshot *deltaSnapshotStoreNodeLister) List() ([]*schedulerframework.NodeInfo, error) {
return snapshot.data.getNodeInfoList(), nil
}
// HavePodsWithAffinityList returns list of all node infos with pods that have affinity constrints.
func (snapshot *deltaSnapshotStoreNodeLister) HavePodsWithAffinityList() ([]*schedulerframework.NodeInfo, error) {
data := snapshot.data
if data.havePodsWithAffinity != nil {
return data.havePodsWithAffinity, nil
}
nodeInfoList := snapshot.data.getNodeInfoList()
havePodsWithAffinityList := make([]*schedulerframework.NodeInfo, 0, len(nodeInfoList))
for _, node := range nodeInfoList {
if len(node.PodsWithAffinity) > 0 {
havePodsWithAffinityList = append(havePodsWithAffinityList, node)
}
}
data.havePodsWithAffinity = havePodsWithAffinityList
return data.havePodsWithAffinity, nil
}
// HavePodsWithRequiredAntiAffinityList returns the list of NodeInfos of nodes with pods with required anti-affinity terms.
func (snapshot *deltaSnapshotStoreNodeLister) HavePodsWithRequiredAntiAffinityList() ([]*schedulerframework.NodeInfo, error) {
data := snapshot.data
if data.havePodsWithRequiredAntiAffinity != nil {
return data.havePodsWithRequiredAntiAffinity, nil
}
nodeInfoList := snapshot.data.getNodeInfoList()
havePodsWithRequiredAntiAffinityList := make([]*schedulerframework.NodeInfo, 0, len(nodeInfoList))
for _, node := range nodeInfoList {
if len(node.PodsWithRequiredAntiAffinity) > 0 {
havePodsWithRequiredAntiAffinityList = append(havePodsWithRequiredAntiAffinityList, node)
}
}
data.havePodsWithRequiredAntiAffinity = havePodsWithRequiredAntiAffinityList
return data.havePodsWithRequiredAntiAffinity, nil
}
// Get returns node info by node name.
func (snapshot *deltaSnapshotStoreNodeLister) Get(nodeName string) (*schedulerframework.NodeInfo, error) {
return (*DeltaSnapshotStore)(snapshot).getNodeInfo(nodeName)
}
// IsPVCUsedByPods returns if PVC is used by pods
func (snapshot *deltaSnapshotStoreStorageLister) IsPVCUsedByPods(key string) bool {
return (*DeltaSnapshotStore)(snapshot).IsPVCUsedByPods(key)
}
func (snapshot *DeltaSnapshotStore) getNodeInfo(nodeName string) (*schedulerframework.NodeInfo, error) {
data := snapshot.data
node, found := data.getNodeInfo(nodeName)
if !found {
return nil, clustersnapshot.ErrNodeNotFound
}
return node, nil
}
// NodeInfos returns node lister.
func (snapshot *DeltaSnapshotStore) NodeInfos() schedulerframework.NodeInfoLister {
return (*deltaSnapshotStoreNodeLister)(snapshot)
}
// StorageInfos returns storage lister
func (snapshot *DeltaSnapshotStore) StorageInfos() schedulerframework.StorageInfoLister {
return (*deltaSnapshotStoreStorageLister)(snapshot)
}
// NewDeltaSnapshotStore creates instances of DeltaSnapshotStore.
func NewDeltaSnapshotStore() *DeltaSnapshotStore {
snapshot := &DeltaSnapshotStore{}
snapshot.clear()
return snapshot
}
// GetNodeInfo gets a NodeInfo.
func (snapshot *DeltaSnapshotStore) GetNodeInfo(nodeName string) (*framework.NodeInfo, error) {
schedNodeInfo, err := snapshot.getNodeInfo(nodeName)
if err != nil {
return nil, err
}
return framework.WrapSchedulerNodeInfo(schedNodeInfo), nil
}
// ListNodeInfos lists NodeInfos.
func (snapshot *DeltaSnapshotStore) ListNodeInfos() ([]*framework.NodeInfo, error) {
schedNodeInfos := snapshot.data.getNodeInfoList()
return framework.WrapSchedulerNodeInfos(schedNodeInfos), nil
}
// AddNodeInfo adds a NodeInfo.
func (snapshot *DeltaSnapshotStore) AddNodeInfo(nodeInfo *framework.NodeInfo) error {
if err := snapshot.data.addNode(nodeInfo.Node()); err != nil {
return err
}
for _, podInfo := range nodeInfo.Pods() {
if err := snapshot.data.addPod(podInfo.Pod, nodeInfo.Node().Name); err != nil {
return err
}
}
return nil
}
// SetClusterState sets the cluster state.
func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error {
snapshot.clear()
knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := snapshot.data.addNode(node); err != nil {
return err
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil {
return err
}
}
}
return nil
}
// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaSnapshotStore) RemoveNodeInfo(nodeName string) error {
return snapshot.data.removeNodeInfo(nodeName)
}
// ForceAddPod adds pod to the snapshot and schedules it to given node.
func (snapshot *DeltaSnapshotStore) ForceAddPod(pod *apiv1.Pod, nodeName string) error {
return snapshot.data.addPod(pod, nodeName)
}
// ForceRemovePod removes pod from the snapshot.
func (snapshot *DeltaSnapshotStore) ForceRemovePod(namespace, podName, nodeName string) error {
return snapshot.data.removePod(namespace, podName, nodeName)
}
// IsPVCUsedByPods returns if the pvc is used by any pod
func (snapshot *DeltaSnapshotStore) IsPVCUsedByPods(key string) bool {
return snapshot.data.isPVCUsedByPods(key)
}
// Fork creates a fork of snapshot state. All modifications can later be reverted to moment of forking via Revert()
// Time: O(1)
func (snapshot *DeltaSnapshotStore) Fork() {
snapshot.data = snapshot.data.fork()
}
// Revert reverts snapshot state to moment of forking.
// Time: O(1)
func (snapshot *DeltaSnapshotStore) Revert() {
if snapshot.data.baseData != nil {
snapshot.data = snapshot.data.baseData
}
}
// Commit commits changes done after forking.
// Time: O(n), where n = size of delta (number of nodes added, modified or deleted since forking)
func (snapshot *DeltaSnapshotStore) Commit() error {
newData, err := snapshot.data.commit()
if err != nil {
return err
}
snapshot.data = newData
return nil
}
// Clear reset cluster snapshot to empty, unforked state
// Time: O(1)
func (snapshot *DeltaSnapshotStore) clear() {
snapshot.data = newInternalDeltaSnapshotData()
}