internal/node/validations.go (124 lines of code) (raw):

package node import ( "context" "fmt" "time" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "github.com/aws/eks-hybrid/internal/kubelet" ) const defaultStaticPodManifestPath = "/etc/kubernetes/manifest" const ( nodeValidationInterval = 10 * time.Second nodeValidationTimeout = 1 * time.Minute nodeValidationMaxRetries = 5 ) // NodeValidationOptions are options to configure node validations. type NodeValidationOptions struct { ValidationInterval time.Duration ValidationTimeout time.Duration MaxRetries int } // NodeValidationOption defines a function type for setting options. type NodeValidationOption func(*NodeValidationOptions) // DefaultNodeValidationOptions returns the default configuration options. func DefaultNodeValidationOptions() NodeValidationOptions { return NodeValidationOptions{ ValidationInterval: nodeValidationInterval, ValidationTimeout: nodeValidationTimeout, MaxRetries: nodeValidationMaxRetries, } } // WithValidationInterval sets the validation interval. func WithValidationInterval(interval time.Duration) NodeValidationOption { return func(opts *NodeValidationOptions) { opts.ValidationInterval = interval } } // WithValidationTimeout sets the validation timeout. func WithValidationTimeout(timeout time.Duration) NodeValidationOption { return func(opts *NodeValidationOptions) { opts.ValidationTimeout = timeout } } // WithMaxRetries sets the maximum number of retries. func WithMaxRetries(maxRetries int) NodeValidationOption { return func(opts *NodeValidationOptions) { opts.MaxRetries = maxRetries } } func IsUnscheduled(ctx context.Context) error { node, err := getCurrentNode(ctx) if err != nil { if apierrors.IsNotFound(err) { return nil } return err } if !node.Spec.Unschedulable { return fmt.Errorf("node is schedulable") } return nil } func IsDrained(ctx context.Context) (bool, error) { nodeName, err := kubelet.GetNodeName() if err != nil { return false, errors.Wrap(err, "getting node name from kubelet") } clientset, err := kubelet.GetKubeClientFromKubeConfig() if err != nil { return false, errors.Wrap(err, "failed to create kubernetes client") } pods, err := GetPodsOnNode(ctx, nodeName, clientset) if err != nil { return false, errors.Wrapf(err, "getting pods for node %s", nodeName) } return isDrained(pods) } func isDrained(pods []v1.Pod) (bool, error) { for _, filter := range getDrainedPodFilters() { var err error pods, err = filter(pods) if err != nil { return false, errors.Wrap(err, "running filter on pods") } } return len(pods) == 0, nil } func IsInitialized(ctx context.Context) error { _, err := getCurrentNode(ctx) if err != nil { return err } return nil } func getCurrentNode(ctx context.Context) (*v1.Node, error) { nodeName, err := kubelet.GetNodeName() if err != nil { return nil, err } clientset, err := kubelet.GetKubeClientFromKubeConfig() if err != nil { return nil, err } return getNode(ctx, nodeName, clientset) } func getNode(ctx context.Context, nodeName string, clientset kubernetes.Interface, options ...NodeValidationOption) (*v1.Node, error) { opts := DefaultNodeValidationOptions() for _, option := range options { option(&opts) } var node *v1.Node var err error consecutiveErrors := 0 err = wait.PollUntilContextTimeout(ctx, opts.ValidationInterval, opts.ValidationTimeout, true, func(ctx context.Context) (bool, error) { node, err = clientset.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) if err != nil { consecutiveErrors += 1 if consecutiveErrors == opts.MaxRetries { return false, errors.Wrap(err, "failed to get current node") } return false, nil // continue polling } return true, nil }) return node, err }