cluster-autoscaler/cloudprovider/oci/nodepools/oci_manager.go (642 lines of code) (raw):
/*
Copyright 2020-2023 Oracle and/or its affiliates.
*/
package nodepools
import (
"context"
"fmt"
"math"
"os"
"regexp"
"strconv"
"strings"
"time"
"github.com/pkg/errors"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/client-go/kubernetes"
klog "k8s.io/klog/v2"
ocicommon "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci/common"
ipconsts "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci/instancepools/consts"
npconsts "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci/nodepools/consts"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci/vendor-internal/github.com/oracle/oci-go-sdk/v65/common"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci/vendor-internal/github.com/oracle/oci-go-sdk/v65/common/auth"
oke "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci/vendor-internal/github.com/oracle/oci-go-sdk/v65/containerengine"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci/vendor-internal/github.com/oracle/oci-go-sdk/v65/core"
)
const (
maxAddTaintRetries = 5
maxGetNodepoolRetries = 3
clusterId = "clusterId"
compartmentId = "compartmentId"
nodepoolTags = "nodepoolTags"
min = "min"
max = "max"
)
var (
maxRetryDeadline time.Duration = 5 * time.Second
conflictRetryInterval time.Duration = 750 * time.Millisecond
errInstanceNodePoolNotFound = errors.New("node pool not found for instance")
)
// NodePoolManager defines the operations required for an *instance-pool based* autoscaler.
type NodePoolManager interface {
// Refresh triggers refresh of cached resources.
Refresh() error
// Cleanup cleans up open resources before the cloud provider is destroyed, i.e. go routines etc.
Cleanup() error
// GetNodePools returns list of registered NodePools.
GetNodePools() []NodePool
// GetNodePoolNodes returns NodePool nodes.
GetNodePoolNodes(np NodePool) ([]cloudprovider.Instance, error)
// GetNodePoolNodes returns NodePool nodes.
GetExistingNodePoolSizeViaCompute(np NodePool) (int, error)
// GetNodePoolForInstance returns NodePool to which the given instance belongs.
GetNodePoolForInstance(instance ocicommon.OciRef) (NodePool, error)
// GetNodePoolTemplateNode returns a template node for NodePool.
GetNodePoolTemplateNode(np NodePool) (*apiv1.Node, error)
// GetNodePoolSize gets NodePool size.
GetNodePoolSize(np NodePool) (int, error)
// SetNodePoolSize sets NodePool size.
SetNodePoolSize(np NodePool, size int) error
// DeleteInstances deletes the given instances. All instances must be controlled by the same NodePool.
DeleteInstances(np NodePool, instances []ocicommon.OciRef) error
// Invalidate node pool cache and refresh it
InvalidateAndRefreshCache() error
// Taint with ToBeDeletedByClusterAutoscaler to avoid unexpected CA restarts scheduling pods on a node intended to be deleted before restart
TaintToPreventFurtherSchedulingOnRestart(nodes []*apiv1.Node, client kubernetes.Interface) error
}
type okeClient interface {
GetNodePool(context.Context, oke.GetNodePoolRequest) (oke.GetNodePoolResponse, error)
UpdateNodePool(context.Context, oke.UpdateNodePoolRequest) (oke.UpdateNodePoolResponse, error)
DeleteNode(context.Context, oke.DeleteNodeRequest) (oke.DeleteNodeResponse, error)
ListNodePools(ctx context.Context, request oke.ListNodePoolsRequest) (oke.ListNodePoolsResponse, error)
}
// CreateNodePoolManager creates an NodePoolManager that can manage autoscaling node pools
func CreateNodePoolManager(cloudConfigPath string, nodeGroupAutoDiscoveryList []string, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, kubeClient kubernetes.Interface) (NodePoolManager, error) {
var err error
var configProvider common.ConfigurationProvider
if os.Getenv(ipconsts.OciUseWorkloadIdentityEnvVar) == "true" {
klog.Info("using workload identity provider")
configProvider, err = auth.OkeWorkloadIdentityConfigurationProvider()
if err != nil {
return nil, err
}
} else if os.Getenv(ipconsts.OciUseInstancePrincipalEnvVar) == "true" || os.Getenv(npconsts.OkeUseInstancePrincipalEnvVar) == "true" {
klog.Info("using instance principal provider")
configProvider, err = auth.InstancePrincipalConfigurationProvider()
if err != nil {
return nil, err
}
} else {
klog.Info("using default configuration provider")
configProvider = common.DefaultConfigProvider()
}
cloudConfig, err := ocicommon.CreateCloudConfig(cloudConfigPath, configProvider, npconsts.OciNodePoolResourceIdent)
if err != nil {
return nil, err
}
clientConfig := common.CustomClientConfiguration{
RetryPolicy: ocicommon.NewRetryPolicy(),
}
okeClient, err := oke.NewContainerEngineClientWithConfigurationProvider(configProvider)
if err != nil {
return nil, errors.Wrap(err, "unable to create oke client")
}
okeClient.SetCustomClientConfiguration(clientConfig)
// undocumented endpoint for testing in dev
if os.Getenv(npconsts.OkeHostOverrideEnvVar) != "" {
okeClient.BaseClient.Host = os.Getenv(npconsts.OkeHostOverrideEnvVar)
}
// node pools don't need this, but set it anyway
computeMgmtClient, err := core.NewComputeManagementClientWithConfigurationProvider(configProvider)
if err != nil {
return nil, errors.Wrap(err, "unable to create compute management client")
}
computeMgmtClient.SetCustomClientConfiguration(clientConfig)
computeClient, err := core.NewComputeClientWithConfigurationProvider(configProvider)
if err != nil {
return nil, errors.Wrap(err, "unable to create compute client")
}
computeClient.SetCustomClientConfiguration(clientConfig)
//ociShapeGetter := ocicommon.CreateShapeGetter(computeClient)
ociShapeGetter := ocicommon.CreateShapeGetter(ocicommon.ShapeClientImpl{ComputeMgmtClient: computeMgmtClient, ComputeClient: computeClient})
ociTagsGetter := ocicommon.CreateTagsGetter()
registeredTaintsGetter := CreateRegisteredTaintsGetter()
manager := &ociManagerImpl{
cfg: cloudConfig,
okeClient: &okeClient,
computeClient: &computeClient,
staticNodePools: map[string]NodePool{},
ociShapeGetter: ociShapeGetter,
ociTagsGetter: ociTagsGetter,
registeredTaintsGetter: registeredTaintsGetter,
nodePoolCache: newNodePoolCache(&okeClient),
}
// auto discover nodepools from compartments with nodeGroupAutoDiscovery parameter
klog.Infof("checking node groups for autodiscovery ... ")
for _, arg := range nodeGroupAutoDiscoveryList {
nodeGroup, err := nodeGroupFromArg(arg)
if err != nil {
return nil, fmt.Errorf("unable to construct node group auto discovery from argument: %v", err)
}
nodeGroup.manager = manager
nodeGroup.kubeClient = kubeClient
manager.nodeGroups = append(manager.nodeGroups, *nodeGroup)
autoDiscoverNodeGroups(manager, manager.okeClient, *nodeGroup)
}
// Contains all the specs from the args that give us the pools.
for _, arg := range discoveryOpts.NodeGroupSpecs {
np, err := nodePoolFromArg(arg)
if err != nil {
return nil, fmt.Errorf("unable to construct node pool from argument: %v", err)
}
np.manager = manager
np.kubeClient = kubeClient
manager.staticNodePools[np.Id()] = np
}
// wait until we have an initial full cache.
wait.PollImmediateInfinite(
10*time.Second,
func() (bool, error) {
err := manager.Refresh()
if err != nil {
klog.Errorf("unable to fill cache on startup. Retrying: %+v", err)
return false, nil
}
return true, nil
})
return manager, nil
}
func autoDiscoverNodeGroups(m *ociManagerImpl, okeClient okeClient, nodeGroup nodeGroupAutoDiscovery) (bool, error) {
var resp, reqErr = okeClient.ListNodePools(context.Background(), oke.ListNodePoolsRequest{
ClusterId: common.String(nodeGroup.clusterId),
CompartmentId: common.String(nodeGroup.compartmentId),
})
if reqErr != nil {
klog.Errorf("failed to fetch the nodepool list with clusterId: %s, compartmentId: %s. Error: %v", nodeGroup.clusterId, nodeGroup.compartmentId, reqErr)
return false, reqErr
}
for _, nodePoolSummary := range resp.Items {
if validateNodepoolTags(nodeGroup.tags, nodePoolSummary.FreeformTags, nodePoolSummary.DefinedTags) {
nodepool := &nodePool{}
nodepool.id = *nodePoolSummary.Id
nodepool.minSize = nodeGroup.minSize
nodepool.maxSize = nodeGroup.maxSize
nodepool.manager = nodeGroup.manager
nodepool.kubeClient = nodeGroup.kubeClient
m.staticNodePools[nodepool.id] = nodepool
klog.V(5).Infof("auto discovered nodepool in compartment : %s , nodepoolid: %s", nodeGroup.compartmentId, nodepool.id)
} else {
klog.Warningf("nodepool ignored as the tags do not satisfy the requirement : %s , %v, %v", *nodePoolSummary.Id, nodePoolSummary.FreeformTags, nodePoolSummary.DefinedTags)
}
}
return true, nil
}
func validateNodepoolTags(nodeGroupTags map[string]string, freeFormTags map[string]string, definedTags map[string]map[string]interface{}) bool {
if nodeGroupTags != nil {
for tagKey, tagValue := range nodeGroupTags {
namespacedTagKey := strings.Split(tagKey, ".")
if len(namespacedTagKey) == 2 && tagValue != definedTags[namespacedTagKey[0]][namespacedTagKey[1]] {
return false
} else if len(namespacedTagKey) != 2 && tagValue != freeFormTags[tagKey] {
return false
}
}
}
return true
}
// nodePoolFromArg parses a node group spec represented in the form of `<minSize>:<maxSize>:<ocid>` and produces a node group spec object
func nodePoolFromArg(value string) (*nodePool, error) {
tokens := strings.SplitN(value, ":", 3)
if len(tokens) != 3 {
return nil, fmt.Errorf("wrong nodes configuration: %s", value)
}
spec := &nodePool{}
if size, err := strconv.Atoi(tokens[0]); err == nil {
spec.minSize = size
} else {
return nil, fmt.Errorf("failed to set min size: %s, expected integer", tokens[0])
}
if size, err := strconv.Atoi(tokens[1]); err == nil {
spec.maxSize = size
} else {
return nil, fmt.Errorf("failed to set max size: %s, expected integer", tokens[1])
}
spec.id = tokens[2]
klog.Infof("static node spec constructed: %+v", spec)
return spec, nil
}
// nodeGroupFromArg parses a node group spec represented in the form of
// `clusterId:<clusterId>,compartmentId:<compartmentId>,nodepoolTags:<tagKey1>=<tagValue1>&<tagKey2>=<tagValue2>,min:<min>,max:<max>`
// and produces a node group auto discovery object
func nodeGroupFromArg(value string) (*nodeGroupAutoDiscovery, error) {
// this regex will find the key-value pairs in any given order if separated with a colon
regexPattern := `(?:` + compartmentId + `:(?P<` + compartmentId + `>[^,]+)`
regexPattern = regexPattern + `|` + nodepoolTags + `:(?P<` + nodepoolTags + `>[^,]+)`
regexPattern = regexPattern + `|` + max + `:(?P<` + max + `>[^,]+)`
regexPattern = regexPattern + `|` + min + `:(?P<` + min + `>[^,]+)`
regexPattern = regexPattern + `|` + clusterId + `:(?P<` + clusterId + `>[^,]+)`
regexPattern = regexPattern + `)(?:,|$)`
re := regexp.MustCompile(regexPattern)
parametersMap := make(map[string]string)
// push key-value pairs into a map
for _, match := range re.FindAllStringSubmatch(value, -1) {
for i, name := range re.SubexpNames() {
if i != 0 && match[i] != "" {
parametersMap[name] = match[i]
}
}
}
spec := &nodeGroupAutoDiscovery{}
if parametersMap[clusterId] != "" {
spec.clusterId = parametersMap[clusterId]
} else {
return nil, fmt.Errorf("failed to set %s, it is missing in node-group-auto-discovery parameter", clusterId)
}
if parametersMap[compartmentId] != "" {
spec.compartmentId = parametersMap[compartmentId]
} else {
return nil, fmt.Errorf("failed to set %s, it is missing in node-group-auto-discovery parameter", compartmentId)
}
if size, err := strconv.Atoi(parametersMap[min]); err == nil {
spec.minSize = size
} else {
return nil, fmt.Errorf("failed to set %s size: %s, expected integer", min, parametersMap[min])
}
if size, err := strconv.Atoi(parametersMap[max]); err == nil {
spec.maxSize = size
} else {
return nil, fmt.Errorf("failed to set %s size: %s, expected integer", max, parametersMap[max])
}
if parametersMap[nodepoolTags] != "" {
nodepoolTags := parametersMap[nodepoolTags]
spec.tags = make(map[string]string)
pairs := strings.Split(nodepoolTags, "&")
for _, pair := range pairs {
parts := strings.Split(pair, "=")
if len(parts) == 2 {
spec.tags[parts[0]] = parts[1]
} else {
return nil, fmt.Errorf("nodepoolTags should be given in tagKey=tagValue format, this is not valid: %s", pair)
}
}
} else {
return nil, fmt.Errorf("failed to set %s, it is missing in node-group-auto-discovery parameter", nodepoolTags)
}
klog.Infof("node group auto discovery spec constructed: %+v", spec)
return spec, nil
}
type ociManagerImpl struct {
cfg *ocicommon.CloudConfig
okeClient okeClient
computeClient *core.ComputeClient
ociShapeGetter ocicommon.ShapeGetter
ociTagsGetter ocicommon.TagsGetter
registeredTaintsGetter RegisteredTaintsGetter
staticNodePools map[string]NodePool
nodeGroups []nodeGroupAutoDiscovery
lastRefresh time.Time
// caches the node pool objects received from OKE.
// All interactions with OKE's API should go through the cache.
nodePoolCache *nodePoolCache
}
// Refresh triggers refresh of cached resources.
func (m *ociManagerImpl) Refresh() error {
if m.lastRefresh.Add(m.cfg.Global.RefreshInterval).After(time.Now()) {
return nil
}
return m.forceRefresh()
}
// InvalidateAndRefreshCache Resets the refresh timer and refreshes
func (m *ociManagerImpl) InvalidateAndRefreshCache() error {
// set time to 0001-01-01 00:00:00 +0000 UTC
m.lastRefresh = time.Time{}
return m.Refresh()
}
// TaintToPreventFurtherSchedulingOnRestart adds a taint to prevent new pods from scheduling onto the node
// this fixes a race condition where a node can be deleted, and if it's not deleted in time, the delete will retry
// and if this second delet fails, it can make the node usable again. This taint prevents this from happening
func (m *ociManagerImpl) TaintToPreventFurtherSchedulingOnRestart(nodes []*apiv1.Node, client kubernetes.Interface) error {
for _, node := range nodes {
taintErr := addTaint(node, client, npconsts.ToBeDeletedByClusterAutoscaler, apiv1.TaintEffectNoSchedule)
if taintErr != nil {
return taintErr
}
}
return nil
}
func (m *ociManagerImpl) forceRefresh() error {
// auto discover node groups
if m.nodeGroups != nil {
// empty previous nodepool map to do an auto discovery
m.staticNodePools = make(map[string]NodePool)
for _, nodeGroup := range m.nodeGroups {
autoDiscoverNodeGroups(m, m.okeClient, nodeGroup)
}
}
// rebuild nodepool cache
err := m.nodePoolCache.rebuild(m.staticNodePools, maxGetNodepoolRetries)
if err != nil {
return err
}
m.lastRefresh = time.Now()
klog.Infof("Refreshed NodePool list, next refresh after %v", m.lastRefresh.Add(m.cfg.Global.RefreshInterval))
return nil
}
// Cleanup cleans up open resources before the cloud provider is destroyed, i.e. go routines etc.
func (m *ociManagerImpl) Cleanup() error {
return nil
}
// GetNodePools returns list of registered NodePools.
func (m *ociManagerImpl) GetNodePools() []NodePool {
var nodePools []NodePool
for _, np := range m.staticNodePools {
nodePoolInCache := m.nodePoolCache.cache[np.Id()]
if nodePoolInCache != nil {
nodePools = append(nodePools, np)
}
}
return nodePools
}
// GetExistingNodePoolSizeViaCompute returns the size of nodepool that are not in a terminal state. This uses compute call to do so
// We do this to avoid any dependency on the internal caching that happens, so that we have the latest node pool state always
func (m *ociManagerImpl) GetExistingNodePoolSizeViaCompute(np NodePool) (int, error) {
klog.V(4).Infof("getting nodes for node pool: %q", np.Id())
nodePoolDetails, err := m.nodePoolCache.get(np.Id())
if err != nil {
klog.V(4).Error(err, "error fetching detailed nodepool from cache")
return math.MaxInt32, err
}
request := core.ListInstancesRequest{
CompartmentId: nodePoolDetails.CompartmentId,
Limit: common.Int(500),
}
displayNamePrefix := getDisplayNamePrefix(*nodePoolDetails.ClusterId, *nodePoolDetails.Id)
klog.V(5).Infof("Filter used is prefix %q", displayNamePrefix)
listInstancesFunc := func(request core.ListInstancesRequest) (core.ListInstancesResponse, error) {
return m.computeClient.ListInstances(context.Background(), request)
}
var instances []cloudprovider.Instance
for r, err := listInstancesFunc(request); ; r, err = listInstancesFunc(request) {
if err != nil {
klog.V(5).Error(err, "error while performing listInstancesFunc call")
return math.MaxInt32, err
}
for _, item := range r.Items {
klog.V(6).Infof("checking instance %q (instance ocid: %q) in state %q", *item.DisplayName, *item.Id, item.LifecycleState)
if !strings.HasPrefix(*item.DisplayName, displayNamePrefix) {
continue
}
switch item.LifecycleState {
case core.InstanceLifecycleStateStopped, core.InstanceLifecycleStateTerminated:
klog.V(4).Infof("skipping instance is in stopped/terminated state: %q", *item.Id)
case core.InstanceLifecycleStateCreatingImage, core.InstanceLifecycleStateStarting, core.InstanceLifecycleStateProvisioning, core.InstanceLifecycleStateMoving:
instances = append(instances, cloudprovider.Instance{
Id: *item.Id,
Status: &cloudprovider.InstanceStatus{
State: cloudprovider.InstanceCreating,
},
})
// in case an instance is running, it could either be installing OKE software or become a Ready node.
// we do not know, but as we only need info if a node is stopped / terminated, we do not care
case core.InstanceLifecycleStateRunning:
instances = append(instances, cloudprovider.Instance{
Id: *item.Id,
Status: &cloudprovider.InstanceStatus{
State: cloudprovider.InstanceRunning,
},
})
case core.InstanceLifecycleStateStopping, core.InstanceLifecycleStateTerminating:
instances = append(instances, cloudprovider.Instance{
Id: *item.Id,
Status: &cloudprovider.InstanceStatus{
State: cloudprovider.InstanceDeleting,
},
})
default:
klog.Warningf("instance found in unhandled state: (%q = %v)", *item.Id, item.LifecycleState)
}
}
// pagination logic
if r.OpcNextPage != nil {
// if there are more items in next page, fetch items from next page
request.Page = r.OpcNextPage
} else {
// no more result, break the loop
break
}
}
return len(instances), nil
}
func getDisplayNamePrefix(clusterId string, nodePoolId string) string {
shortNodePoolId := nodePoolId[len(nodePoolId)-11:]
shortClusterId := clusterId[len(clusterId)-11:]
return "oke" +
"-" + shortClusterId +
"-" + shortNodePoolId
}
// GetNodePoolNodes returns NodePool nodes that are not in a terminal state.
func (m *ociManagerImpl) GetNodePoolNodes(np NodePool) ([]cloudprovider.Instance, error) {
klog.V(4).Infof("getting nodes for node pool: %q", np.Id())
nodePool, err := m.nodePoolCache.get(np.Id())
if err != nil {
return nil, err
}
var instances []cloudprovider.Instance
for _, node := range nodePool.Nodes {
if node.NodeError != nil {
errorClass := cloudprovider.OtherErrorClass
if *node.NodeError.Code == "LimitExceeded" ||
(*node.NodeError.Code == "InternalServerError" &&
strings.Contains(*node.NodeError.Message, "quota")) {
errorClass = cloudprovider.OutOfResourcesErrorClass
}
instances = append(instances, cloudprovider.Instance{
Id: *node.Id,
Status: &cloudprovider.InstanceStatus{
State: cloudprovider.InstanceCreating,
ErrorInfo: &cloudprovider.InstanceErrorInfo{
ErrorClass: errorClass,
ErrorCode: *node.NodeError.Code,
ErrorMessage: *node.NodeError.Message,
},
},
})
continue
}
switch node.LifecycleState {
case oke.NodeLifecycleStateDeleted:
klog.V(4).Infof("skipping instance is in deleted state: %q", *node.Id)
case oke.NodeLifecycleStateDeleting:
instances = append(instances, cloudprovider.Instance{
Id: *node.Id,
Status: &cloudprovider.InstanceStatus{
State: cloudprovider.InstanceDeleting,
},
})
case oke.NodeLifecycleStateCreating, oke.NodeLifecycleStateUpdating:
instances = append(instances, cloudprovider.Instance{
Id: *node.Id,
Status: &cloudprovider.InstanceStatus{
State: cloudprovider.InstanceCreating,
},
})
case oke.NodeLifecycleStateActive:
instances = append(instances, cloudprovider.Instance{
Id: *node.Id,
Status: &cloudprovider.InstanceStatus{
State: cloudprovider.InstanceRunning,
},
})
default:
klog.Warningf("instance found in unhandled state: (%q = %v)", *node.Id, node.LifecycleState)
}
}
return instances, nil
}
// GetNodePoolForInstance returns NodePool to which the given instance belongs.
func (m *ociManagerImpl) GetNodePoolForInstance(instance ocicommon.OciRef) (NodePool, error) {
if instance.NodePoolID == "" {
klog.V(4).Infof("node pool id missing from reference: %+v", instance)
// we're looking up an unregistered node, so we can't use node pool id.
nodePool, err := m.nodePoolCache.getByInstance(instance.InstanceID)
if err != nil {
return nil, err
}
return m.staticNodePools[*nodePool.Id], nil
}
np, found := m.staticNodePools[instance.NodePoolID]
if !found {
klog.V(4).Infof("did not find node pool for reference: %+v", instance)
return nil, errInstanceNodePoolNotFound
}
return np, nil
}
// GetNodePoolTemplateNode returns a template node for NodePool.
func (m *ociManagerImpl) GetNodePoolTemplateNode(np NodePool) (*apiv1.Node, error) {
nodePool, err := m.nodePoolCache.get(np.Id())
if err != nil {
return nil, err
}
node, err := m.buildNodeFromTemplate(nodePool)
if err != nil {
return nil, err
}
return node, nil
}
// GetNodePoolSize gets NodePool size.
func (m *ociManagerImpl) GetNodePoolSize(np NodePool) (int, error) {
return m.nodePoolCache.getSize(np.Id())
}
// SetNodePoolSize sets NodePool size.
func (m *ociManagerImpl) SetNodePoolSize(np NodePool, size int) error {
err := m.nodePoolCache.setSize(np.Id(), size)
if err != nil {
return err
}
// We do not wait for the work request to finish or nodes become active on purpose. This allows
// the autoscaler to make decisions quicker especially since the autoscaler is aware of
// unregistered nodes in addition to registered nodes.
return nil
}
// DeleteInstances deletes the given instances. All instances must be controlled by the same NodePool.
func (m *ociManagerImpl) DeleteInstances(np NodePool, instances []ocicommon.OciRef) error {
klog.Infof("DeleteInstances called")
for _, instance := range instances {
err := m.nodePoolCache.removeInstance(np.Id(), instance.InstanceID, instance.Name)
if err != nil {
return err
}
}
return nil
}
func (m *ociManagerImpl) buildNodeFromTemplate(nodePool *oke.NodePool) (*apiv1.Node, error) {
node := apiv1.Node{}
nodeName := fmt.Sprintf("%s-%d", "ok", 555555)
node.ObjectMeta = metav1.ObjectMeta{
Name: nodeName,
Labels: map[string]string{},
}
// Add all the initial node labels from the NodePool configuration to the
// templated node.
for _, kv := range nodePool.InitialNodeLabels {
node.ObjectMeta.Labels[*kv.Key] = *kv.Value
}
node.Status = apiv1.NodeStatus{
Capacity: apiv1.ResourceList{},
}
freeformTags, err := m.ociTagsGetter.GetNodePoolFreeformTags(nodePool)
if err != nil {
return nil, err
}
ephemeralStorage, err := getEphemeralResourceRequestsInBytes(freeformTags)
if err != nil {
klog.Error(err)
}
shape, err := m.ociShapeGetter.GetNodePoolShape(nodePool, ephemeralStorage)
if err != nil {
return nil, err
}
taints, err := m.registeredTaintsGetter.Get(nodePool)
if err != nil {
klog.Warningf("could not extract taints from the nodepool: %s. Continuing on with empty taint list", err)
taints = []apiv1.Taint{}
}
node.Spec = apiv1.NodeSpec{
Taints: taints,
}
if shape.GPU > 0 {
node.Spec.Taints = append(node.Spec.Taints, apiv1.Taint{
Key: "nvidia.com/gpu",
Value: "",
Effect: "NoSchedule",
})
}
if err != nil {
return nil, err
}
node.Status.Capacity[apiv1.ResourcePods] = *resource.NewQuantity(110, resource.DecimalSI)
node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(int64(shape.CPU), resource.DecimalSI)
node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(int64(shape.MemoryInBytes), resource.DecimalSI)
node.Status.Capacity[ipconsts.ResourceGPU] = *resource.NewQuantity(int64(shape.GPU), resource.DecimalSI)
if ephemeralStorage != -1 {
node.Status.Capacity[apiv1.ResourceEphemeralStorage] = *resource.NewQuantity(ephemeralStorage, resource.DecimalSI)
}
node.Status.Allocatable = node.Status.Capacity
availabilityDomain, err := getNodePoolAvailabilityDomain(nodePool)
if err != nil {
return nil, err
}
node.Labels = cloudprovider.JoinStringMaps(node.Labels, ocicommon.BuildGenericLabels(*nodePool.Id, nodeName, shape.Name, availabilityDomain))
node.Status.Conditions = cloudprovider.BuildReadyConditions()
return &node, nil
}
// getNodePoolAvailabilityDomain determines the availability of the node pool.
// This breaks down if the customer specifies more than one placement configuration,
// so best practices should be a node pool per AD if customers care about it during scheduling.
// if there are more than 1AD defined, then we return the first one always.
func getNodePoolAvailabilityDomain(np *oke.NodePool) (string, error) {
if len(np.NodeConfigDetails.PlacementConfigs) == 0 {
return "", fmt.Errorf("node pool %q has no placement configurations", *np.Id)
}
if len(np.NodeConfigDetails.PlacementConfigs) > 1 {
klog.Warningf("node pool %q has more than 1 placement config so picking first availability domain", *np.Id)
}
// Get the availability domain which is by default in the format of `Uocm:PHX-AD-1`
// and remove the hash prefix.
availabilityDomain := strings.Split(*np.NodeConfigDetails.PlacementConfigs[0].AvailabilityDomain, ":")[1]
return availabilityDomain, nil
}
func addTaint(node *apiv1.Node, client kubernetes.Interface, taintKey string, effect apiv1.TaintEffect) error {
retryDeadline := time.Now().Add(maxRetryDeadline)
freshNode := node.DeepCopy()
var err error
refresh := false
for i := 0; i < maxAddTaintRetries; i++ {
if refresh {
// Get the newest version of the node.
freshNode, err = client.CoreV1().Nodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
if err != nil || freshNode == nil {
klog.Warningf("Error while adding %v taint on node %v: %v", taintKey, node.Name, err)
}
}
if !addTaintToSpec(freshNode, taintKey, effect) {
if !refresh {
// Make sure we have the latest version before skipping update.
refresh = true
continue
}
return nil
}
_, err = client.CoreV1().Nodes().Update(context.TODO(), freshNode, metav1.UpdateOptions{})
if err != nil && IsConflict(err) && time.Now().Before(retryDeadline) {
refresh = true
time.Sleep(conflictRetryInterval)
continue
}
if err != nil {
klog.Warningf("Error while adding %v taint on node %v: %v", taintKey, node.Name, err)
return err
}
klog.V(1).Infof("Successfully added %v on node %v", taintKey, node.Name)
return nil
}
klog.Errorf("Could not add taint %v on node %v in %d attempts", taintKey, node.Name, maxAddTaintRetries)
return nil
}
func addTaintToSpec(node *apiv1.Node, taintKey string, effect apiv1.TaintEffect) bool {
for _, taint := range node.Spec.Taints {
if taint.Key == taintKey {
klog.V(2).Infof("%v already present on node %v, taint: %v", taintKey, node.Name, taint)
return false
}
}
node.Spec.Taints = append(node.Spec.Taints, apiv1.Taint{
Key: taintKey,
Value: fmt.Sprint(time.Now().Unix()),
Effect: effect,
})
return true
}
func getEphemeralResourceRequestsInBytes(tags map[string]string) (int64, error) {
for key, value := range tags {
if key == npconsts.EphemeralStorageSize {
klog.V(4).Infof("ephemeral-storage size set with value : %v", value)
value = strings.ReplaceAll(value, " ", "")
resourceSize, err := resource.ParseQuantity(value)
if err != nil {
return -1, err
}
klog.V(4).Infof("ephemeral-storage size = %v (%v)", resourceSize.Value(), resourceSize.Format)
return resourceSize.Value(), nil
}
}
klog.V(4).Infof("ephemeral-storage size not set as part of the nodepool's freeform tags")
return -1, nil
}
// IsConflict checks if the error is a conflict
func IsConflict(err error) bool {
return ReasonForError(err) == metav1.StatusReasonConflict
}
// ReasonForError returns the error's reason
func ReasonForError(err error) metav1.StatusReason {
if status := APIStatus(nil); errors.As(err, &status) {
return status.Status().Reason
}
return metav1.StatusReasonUnknown
}
// APIStatus allows the conversion of errors into status objects
type APIStatus interface {
Status() metav1.Status
}