internal/deployers/eksapi/k8s.go (277 lines of code) (raw):
package eksapi
import (
"context"
"errors"
"fmt"
"net/url"
"strings"
"time"
"github.com/aws/aws-k8s-tester/internal/metrics"
"github.com/aws/aws-k8s-tester/internal/util"
"github.com/aws/aws-sdk-go-v2/service/ec2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
crlog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
corev1 "k8s.io/api/core/v1"
)
func init() {
// controller-runtime will complain loudly if this isn't set, even though we don't use this logger
crlog.SetLogger(zap.New())
}
type k8sClient struct {
config *rest.Config
clientset kubernetes.Interface
client client.Client
}
func newK8sClient(kubeconfigPath string) (*k8sClient, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return nil, err
}
return &k8sClient{
config: config,
clientset: kubernetes.NewForConfigOrDie(config),
client: util.Must(client.New(config, client.Options{})),
}, nil
}
func (k *k8sClient) waitForReadyNodes(nodeCount int, timeout time.Duration) error {
klog.Infof("waiting up to %v for %d node(s) to be ready...", timeout, nodeCount)
readyNodes := sets.NewString()
watcher, err := k.clientset.CoreV1().Nodes().Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to create node watcher: %v", err)
}
defer watcher.Stop()
initialReadyNodes, err := k.getReadyNodes()
if err != nil {
return fmt.Errorf("failed to get ready nodes: %v", err)
}
counter := len(initialReadyNodes)
ctx, _ := context.WithTimeout(context.Background(), timeout)
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
return fmt.Errorf("the watcher channel for the nodes was closed by Kubernetes due to an unknown error")
}
if event.Type == watch.Error {
msg := "unexpected error event type from node watcher"
if statusErr, ok := event.Object.(*metav1.Status); ok {
return fmt.Errorf("%s: %s", msg, statusErr.String())
}
return fmt.Errorf("%s: %+v", msg, event.Object)
}
if event.Object != nil && event.Type != watch.Deleted {
if node, ok := event.Object.(*corev1.Node); ok {
if isNodeReady(node) {
readyNodes.Insert(node.Name)
counter = readyNodes.Len()
}
}
}
case <-ctx.Done():
return fmt.Errorf("timed out waiting for %d nodes to be ready: %w", nodeCount, ctx.Err())
}
if counter >= nodeCount {
break
}
}
klog.Infof("%d node(s) are ready: %v", readyNodes.Len(), readyNodes)
return nil
}
func (k *k8sClient) waitForNodeDeletion(timeout time.Duration) error {
klog.Infof("waiting up to %v for node(s) to be deleted...", timeout)
nodes := sets.NewString()
watcher, err := k.clientset.CoreV1().Nodes().Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to create node watcher: %v", err)
}
defer watcher.Stop()
initialNodes, err := k.clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list nodes: %v", err)
}
for _, node := range initialNodes.Items {
nodes.Insert(node.Name)
}
ctx, cancelFunc := context.WithTimeout(context.Background(), timeout)
defer cancelFunc()
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
return fmt.Errorf("the watcher channel for the nodes was closed by Kubernetes due to an unknown error")
}
if event.Type == watch.Error {
msg := "unexpected error event type from node watcher"
if statusErr, ok := event.Object.(*metav1.Status); ok {
return fmt.Errorf("%s: %s", msg, statusErr.String())
}
return fmt.Errorf("%s: %+v", msg, event.Object)
}
if event.Object != nil {
if node, ok := event.Object.(*corev1.Node); !ok {
return fmt.Errorf("node watcher received an object that isn't a Node: %+v", event.Object)
} else {
switch event.Type {
case watch.Added:
nodes.Insert(node.Name)
case watch.Deleted:
nodes.Delete(node.Name)
}
}
}
case <-ctx.Done():
return fmt.Errorf("timed out waiting for nodes to be deleted: %w", ctx.Err())
}
if len(nodes) == 0 {
break
}
}
klog.Info("all nodes deleted!")
return nil
}
func (k *k8sClient) getReadyNodes() ([]corev1.Node, error) {
nodes, err := k.clientset.CoreV1().Nodes().List(context.TODO(), v1.ListOptions{})
if err != nil {
return nil, err
}
var readyNodes []corev1.Node
for _, node := range nodes.Items {
if isNodeReady(&node) {
readyNodes = append(readyNodes, node)
}
}
return readyNodes, nil
}
func isNodeReady(node *corev1.Node) bool {
c := getNodeReadyCondition(node)
if c == nil {
return false
}
return c.Status == corev1.ConditionTrue
}
func getNodeReadyCondition(node *corev1.Node) *corev1.NodeCondition {
for _, c := range node.Status.Conditions {
if c.Type == corev1.NodeReady {
return &c
}
}
return nil
}
func (k *k8sClient) createAWSAuthConfigMap(nodeNameStrategy string, nodeRoleARN string) error {
mapRoles, err := generateAuthMapRole(nodeNameStrategy, nodeRoleARN)
if err != nil {
return err
}
klog.Infof("generated AuthMapRole %s", mapRoles)
_, err = k.clientset.CoreV1().ConfigMaps("kube-system").Create(context.TODO(), &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "aws-auth",
Namespace: "kube-system",
},
Data: map[string]string{
"mapRoles": mapRoles,
},
}, metav1.CreateOptions{})
return err
}
func getNodeInstanceIDs(nodes []corev1.Node) ([]string, error) {
var instanceIds []string
var errs []error
for _, node := range nodes {
providerId, err := parseKubernetesProviderID(node.Spec.ProviderID)
if err != nil {
errs = append(errs, err)
continue
}
instanceIds = append(instanceIds, providerId.InstanceID)
}
if len(errs) > 0 {
return nil, errors.Join(errs...)
}
return instanceIds, nil
}
func (k *k8sClient) emitNodeMetrics(metricRegistry metrics.MetricRegistry, ec2Client *ec2.Client) error {
nodes, err := k.getReadyNodes()
if err != nil {
return err
}
var errs []error
for _, node := range nodes {
providerId, err := parseKubernetesProviderID(node.Spec.ProviderID)
if err != nil {
errs = append(errs, err)
continue
}
instanceInfo, err := ec2Client.DescribeInstances(context.TODO(), &ec2.DescribeInstancesInput{
InstanceIds: []string{providerId.InstanceID},
})
if err != nil {
errs = append(errs, err)
continue
}
instance := instanceInfo.Reservations[0].Instances[0]
launchTime := *instance.LaunchTime
timeToRegistration := node.ObjectMeta.CreationTimestamp.Time.Sub(launchTime)
timeToReady := getNodeReadyCondition(&node).LastTransitionTime.Time.Sub(launchTime)
nodeDimensions := map[string]string{
"instanceType": string(instance.InstanceType),
"os": node.Status.NodeInfo.OperatingSystem,
"osImage": node.Status.NodeInfo.OSImage,
"arch": node.Status.NodeInfo.Architecture,
}
// we'll emit the metrics with different subset(s) of dimensions, to make aggregation simpler
var nodeDimensionSets []map[string]string
nodeDimensionSets = append(nodeDimensionSets, nodeDimensions)
var osDistro string
if strings.HasPrefix(node.Status.NodeInfo.OSImage, "Amazon Linux") {
// on al2: "Amazon Linux 2"
// on al2023: "Amazon Linux 2023.6.20241010"
parts := strings.Split(node.Status.NodeInfo.OSImage, ".")
amazonLinuxMajorVersion := parts[0]
osDistro = amazonLinuxMajorVersion
}
if osDistro != "" {
nodeDimensions["osDistro"] = osDistro
// if we have an osDistro, add a pared-down dimension set that includes it
nodeDimensionSets = append(nodeDimensionSets, map[string]string{
"osDistro": nodeDimensions["osDistro"],
"instanceType": nodeDimensions["instanceType"],
"arch": nodeDimensions["arch"],
})
}
for _, nodeDimensionSet := range nodeDimensionSets {
metricRegistry.Record(nodeTimeToRegistrationSeconds, timeToRegistration.Seconds(), nodeDimensionSet)
metricRegistry.Record(nodeTimeToReadySeconds, timeToReady.Seconds(), nodeDimensionSet)
}
}
return errors.Join(errs...)
}
type KubernetesProviderID struct {
AvailabilityZone string
InstanceID string
}
func parseKubernetesProviderID(rawProviderId string) (*KubernetesProviderID, error) {
url, err := url.Parse(rawProviderId)
if err != nil {
return nil, fmt.Errorf("malformed provider ID: %s", rawProviderId)
}
if url.Scheme != "aws" {
return nil, fmt.Errorf("usupported provider ID scheme: %s", url.Scheme)
}
if url.Path == "" {
return nil, fmt.Errorf("provider ID path is empty: %s", rawProviderId)
}
// example: /us-west-2a/i-12345abcdefg
parts := strings.Split(url.Path, "/")
if len(parts) != 3 {
return nil, fmt.Errorf("provider ID path does not have 3 parts: %s", url.Path)
}
return &KubernetesProviderID{
AvailabilityZone: parts[1],
InstanceID: parts[2],
}, nil
}