internal/deployers/eksapi/deployer.go (303 lines of code) (raw):
package eksapi
import (
"flag"
"fmt"
"path/filepath"
"time"
"github.com/aws/aws-k8s-tester/internal"
"github.com/aws/aws-k8s-tester/internal/awssdk"
"github.com/aws/aws-k8s-tester/internal/metrics"
"github.com/aws/aws-k8s-tester/internal/util"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
ekstypes "github.com/aws/aws-sdk-go-v2/service/eks/types"
"github.com/octago/sflags/gen/gpflag"
"github.com/spf13/pflag"
"golang.org/x/exp/slices"
"k8s.io/klog"
"sigs.k8s.io/kubetest2/pkg/types"
)
// DeployerName is the name of the deployer
const DeployerName = "eksapi"
const ResourcePrefix = "kubetest2-" + DeployerName
var SupportedNodeNameStrategy = []string{"SessionName", "EC2PrivateDNSName"}
// assert that deployer implements optional interfaces
var _ types.DeployerWithKubeconfig = &deployer{}
var _ types.DeployerWithInit = &deployer{}
var _ types.DeployerWithFinish = &deployer{}
type deployer struct {
commonOptions types.Options
deployerOptions
metrics metrics.MetricRegistry
infraManager *InfrastructureManager
clusterManager *ClusterManager
addonManager *AddonManager
nodeManager *nodeManager
logManager *logManager
staticClusterManager *StaticClusterManager
awsClients *awsClients
infra *Infrastructure
cluster *Cluster
k8sClient *k8sClient
initTime time.Time
}
type deployerOptions struct {
Addons []string `flag:"addons" desc:"Managed addons (name:version pairs) to create in the cluster. Use 'latest' for the most recent version, or 'default' for the default version."`
AMI string `flag:"ami" desc:"AMI for unmanaged nodes"`
AMIType string `flag:"ami-type" desc:"AMI type for managed nodes"`
AutoMode bool `flag:"auto-mode" desc:"Enable EKS Auto Mode"`
CapacityReservation bool `flag:"capacity-reservation" desc:"Use capacity reservation for the unmanaged nodegroup"`
ClusterCreationTimeout time.Duration `flag:"cluster-creation-timeout" desc:"Time to wait for cluster to be created and become active."`
ClusterRoleServicePrincipal string `flag:"cluster-role-service-principal" desc:"Additional service principal that can assume the cluster role"`
EFA bool `flag:"efa" desc:"Create EFA interfaces on the node of an unmanaged nodegroup. One instance type must be passed if set. Requires --unmanaged-nodes and --instance-types."`
EKSEndpointURL string `flag:"endpoint-url" desc:"Endpoint URL for the EKS API"`
EmitMetrics bool `flag:"emit-metrics" desc:"Record and emit metrics to CloudWatch"`
ExpectedAMI string `flag:"expected-ami" desc:"Expected AMI of nodes. Up will fail if the actual nodes are not utilizing the expected AMI. Defaults to --ami if defined."`
// TODO: remove this once it's no longer used in downstream jobs
GenerateSSHKey bool `flag:"generate-ssh-key" desc:"Generate an SSH key to use for tests. The generated key should not be used in production, as it will not have a passphrase."`
InstanceTypes []string `flag:"instance-types" desc:"Node instance types. Cannot be used with --instance-type-archs"`
InstanceTypeArchs []string `flag:"instance-type-archs" desc:"Use default node instance types for specific architectures. Cannot be used with --instance-types"`
IPFamily string `flag:"ip-family" desc:"IP family for the cluster (ipv4 or ipv6)"`
KubeconfigPath string `flag:"kubeconfig" desc:"Path to kubeconfig"`
KubernetesVersion string `flag:"kubernetes-version" desc:"cluster Kubernetes version"`
LogBucket string `flag:"log-bucket" desc:"S3 bucket for storing logs for each run. If empty, logs will not be stored."`
NodeCreationTimeout time.Duration `flag:"node-creation-timeout" desc:"Time to wait for nodes to be created/launched. This should consider instance availability."`
NodeReadyTimeout time.Duration `flag:"node-ready-timeout" desc:"Time to wait for all nodes to become ready"`
Nodes int `flag:"nodes" desc:"number of nodes to launch in cluster"`
NodeNameStrategy string `flag:"node-name-strategy" desc:"Specifies the naming strategy for node. Allowed values: ['SessionName', 'EC2PrivateDNSName'], default to EC2PrivateDNSName"`
Region string `flag:"region" desc:"AWS region for EKS cluster"`
StaticClusterName string `flag:"static-cluster-name" desc:"Optional when re-use existing cluster and node group by querying the kubeconfig and run test"`
TuneVPCCNI bool `flag:"tune-vpc-cni" desc:"Apply tuning parameters to the VPC CNI DaemonSet"`
UnmanagedNodes bool `flag:"unmanaged-nodes" desc:"Use an AutoScalingGroup instead of an EKS-managed nodegroup. Requires --ami"`
UpClusterHeaders []string `flag:"up-cluster-header" desc:"Additional header to add to eks:CreateCluster requests. Specified in the same format as curl's -H flag."`
UserDataFormat string `flag:"user-data-format" desc:"Format of the node instance user data"`
}
// NewDeployer implements deployer.New for EKS using the EKS (and other AWS) API(s) directly (no cloudformation)
func NewDeployer(opts types.Options) (types.Deployer, *pflag.FlagSet) {
// create a deployer object and set fields that are not flag controlled
d := &deployer{
commonOptions: opts,
}
// register flags and return
return d, bindFlags(d)
}
// bindFlags is a helper used to create & bind a flagset to the deployer
func bindFlags(d *deployer) *pflag.FlagSet {
flags, err := gpflag.Parse(d)
if err != nil {
klog.Fatalf("unable to bind flags for deployer")
return nil
}
klog.InitFlags(nil)
flags.AddGoFlagSet(flag.CommandLine)
return flags
}
func (d *deployer) Version() string {
return internal.Version
}
func (d *deployer) Init() error {
d.initTime = time.Now()
awsConfig := awssdk.NewConfig()
d.awsClients = newAWSClients(awsConfig, d.EKSEndpointURL)
resourceID := ResourcePrefix + "-" + d.commonOptions.RunID()
if d.deployerOptions.EmitMetrics {
client := cloudwatch.NewFromConfig(awsConfig)
d.metrics = metrics.NewCloudWatchRegistry(client)
} else {
d.metrics = metrics.NewNoopMetricRegistry()
}
d.infraManager = NewInfrastructureManager(d.awsClients, resourceID, d.metrics)
d.clusterManager = NewClusterManager(d.awsClients, resourceID)
d.addonManager = NewAddonManager(d.awsClients)
d.nodeManager = NewNodeManager(d.awsClients, resourceID)
d.logManager = NewLogManager(d.awsClients, resourceID)
if d.deployerOptions.StaticClusterName != "" {
d.staticClusterManager = NewStaticClusterManager(&d.deployerOptions)
}
return nil
}
func (d *deployer) Finish() error {
d.metrics.Record(totalRuntimeSeconds, float64(time.Since(d.initTime).Seconds()), nil)
return d.metrics.Emit()
}
// Build is a no-op
func (d *deployer) Build() error {
return nil
}
// DumpClusterLogs is a no-op
func (d *deployer) DumpClusterLogs() error {
return nil
}
func (d *deployer) Kubeconfig() (string, error) {
if d.KubeconfigPath == "" {
kubeconfigPath := filepath.Join(d.commonOptions.RunDir(), "kubeconfig")
err := writeKubeconfig(d.cluster, kubeconfigPath)
if err != nil {
klog.Warningf("failed to write kubeconfig: %v", err)
return "", err
}
d.KubeconfigPath = kubeconfigPath
}
return d.KubeconfigPath, nil
}
func (d *deployer) Up() error {
if err := d.verifyUpFlags(); err != nil {
return fmt.Errorf("up flags are invalid: %v", err)
}
if d.deployerOptions.StaticClusterName == "" {
if infra, err := d.infraManager.createInfrastructureStack(&d.deployerOptions); err != nil {
return err
} else {
d.infra = infra
}
}
cluster, err := d.clusterManager.getOrCreateCluster(d.infra, &d.deployerOptions)
if err != nil {
return err
}
d.cluster = cluster
kubeconfig, err := d.Kubeconfig()
if err != nil {
return err
}
d.k8sClient, err = newK8sClient(kubeconfig)
if err != nil {
return err
}
if d.deployerOptions.StaticClusterName != "" {
klog.Infof("inited k8sclient, skip the rest resource creation for static cluster")
d.staticClusterManager.SetK8sClient(kubeconfig)
if err := d.staticClusterManager.EnsureNodeForStaticCluster(); err != nil {
klog.Errorf("Failed to launch nodes: %v", err)
return err
}
klog.Infof("Nodes launched for static cluster")
return nil
}
if d.UnmanagedNodes {
if err := d.k8sClient.createAWSAuthConfigMap(d.NodeNameStrategy, d.infra.nodeRoleARN); err != nil {
return err
}
}
if d.AMI != "" && d.ExpectedAMI == "" {
d.ExpectedAMI = d.AMI
}
if err := d.addonManager.createAddons(d.infra, d.cluster, &d.deployerOptions); err != nil {
return err
}
if d.deployerOptions.TuneVPCCNI {
if err := d.k8sClient.tuneVPCCNI(); err != nil {
return err
}
}
if err := d.nodeManager.createNodes(d.infra, d.cluster, &d.deployerOptions, d.k8sClient); err != nil {
return err
}
if err := d.k8sClient.waitForReadyNodes(d.Nodes, d.NodeReadyTimeout); err != nil {
return err
}
if d.EmitMetrics {
if err := d.k8sClient.emitNodeMetrics(d.metrics, d.awsClients.EC2()); err != nil {
return err
}
}
if err := d.logManager.gatherLogsFromNodes(d.k8sClient, &d.deployerOptions, deployerPhaseUp); err != nil {
klog.Warningf("failed to gather logs from nodes: %v", err)
// don't return err, this isn't critical
}
return nil
}
func (d *deployer) verifyUpFlags() error {
if d.KubernetesVersion == "" {
klog.Infof("--kubernetes-version is empty, attempting to detect it...")
detectedVersion, err := detectKubernetesVersion()
if err != nil {
return fmt.Errorf("unable to detect --kubernetes-version, flag cannot be empty")
}
klog.Infof("detected --kubernetes-version=%s", detectedVersion)
d.KubernetesVersion = detectedVersion
}
if d.Nodes < 0 {
return fmt.Errorf("number of nodes must be greater than zero")
}
if d.Nodes == 0 {
d.Nodes = 3
klog.Infof("Using default number of nodes: %d", d.Nodes)
}
if d.IPFamily == "" {
d.IPFamily = string(ekstypes.IpFamilyIpv4)
klog.Infof("Using default IP family: %s", d.IPFamily)
}
if d.ClusterCreationTimeout == 0 {
d.ClusterCreationTimeout = time.Minute * 15
}
if d.NodeCreationTimeout == 0 {
d.NodeCreationTimeout = time.Minute * 20
}
if d.NodeReadyTimeout == 0 {
d.NodeReadyTimeout = time.Minute * 5
}
if d.StaticClusterName != "" {
klog.Infof("Skip configuration for static cluster")
return nil
}
if len(d.InstanceTypes) > 0 && len(d.InstanceTypeArchs) > 0 {
return fmt.Errorf("--instance-types and --instance-type-archs are mutually exclusive")
}
if d.UnmanagedNodes {
if d.AMI == "" {
return fmt.Errorf("--ami must be specified for --unmanaged-nodes")
}
if d.AMIType != "" {
return fmt.Errorf("--ami-type should not be provided with --unmanaged-nodes")
}
if d.NodeNameStrategy == "" {
d.NodeNameStrategy = "EC2PrivateDNSName"
klog.Infof("Using default node name strategy: EC2PrivateDNSName")
} else {
if !slices.Contains(SupportedNodeNameStrategy, d.NodeNameStrategy) {
return fmt.Errorf("--node-name-strategy must be one of the following values: ['SessionName', 'EC2PrivateDNSName']")
}
}
if d.UserDataFormat == "" {
d.UserDataFormat = "bootstrap.sh"
klog.Infof("Using default user data format: %s", d.UserDataFormat)
}
if d.EFA && len(d.InstanceTypes) != 1 {
return fmt.Errorf("--efa requires a single instance type")
}
} else {
if d.AMI != "" {
return fmt.Errorf("--ami should not be provided without --unmanaged-nodes")
}
if d.AMIType == "" {
d.AMIType = "AL2023_x86_64_STANDARD"
klog.Infof("Using default AMI type: %s", d.AMIType)
}
}
return nil
}
func detectKubernetesVersion() (string, error) {
detectedVersion, err := util.DetectKubernetesVersion()
if err != nil {
return "", err
}
minorVersion, err := util.ParseMinorVersion(detectedVersion)
if err != nil {
return "", err
}
return minorVersion, nil
}
func (d *deployer) IsUp() (up bool, err error) {
return d.clusterManager.isClusterActive()
}
func (d *deployer) Down() error {
if err := d.logManager.gatherLogsFromNodes(d.k8sClient, &d.deployerOptions, deployerPhaseDown); err != nil {
klog.Warningf("failed to gather logs from nodes: %v", err)
// don't return err, this isn't critical
}
if d.deployerOptions.StaticClusterName != "" {
return d.staticClusterManager.TearDownNodeForStaticCluster()
}
return deleteResources(d.infraManager, d.clusterManager, d.nodeManager, d.k8sClient, &d.deployerOptions)
}
func deleteResources(im *InfrastructureManager, cm *ClusterManager, nm *nodeManager, k8sClient *k8sClient /* nillable */, opts *deployerOptions /* nillable */) error {
if err := nm.deleteNodes(k8sClient, opts); err != nil {
return err
}
// the EKS-managed cluster security group may be associated with a leaked ENI
// so we need to make sure we've deleted leaked ENIs before we delete the cluster
// otherwise, the cluster security group will be left behind and will block deletion of our VPC
if err := im.deleteLeakedENIs(); err != nil {
return err
}
if err := cm.deleteCluster(); err != nil {
return err
}
return im.deleteInfrastructureStack()
}