func New()

in eks/eks.go [211:672]


func New(cfg *eksconfig.Config) (ts *Tester, err error) {
	if err := cfg.ValidateAndSetDefaults(); err != nil {
		return nil, err
	}

	lg, logWriter, logFile, err := logutil.NewWithStderrWriter(cfg.LogLevel, cfg.LogOutputs)
	if err != nil {
		return nil, err
	}
	_ = zap.ReplaceGlobals(lg)
	lg.Info("set up log writer and file", zap.Strings("outputs", cfg.LogOutputs), zap.Bool("is-color", cfg.LogColor))
	cfg.Sync()

	colorize := cfg.Colorize

	fmt.Fprint(logWriter, colorize("\n\n\n[yellow]*********************************\n"))
	fmt.Fprintln(logWriter, "😎 🙏 🚶 ✔️ 👍")
	fmt.Fprintf(logWriter, colorize("[light_green]New %q [default](%q)\n\n"), cfg.ConfigPath, version.Version())

	if err = fileutil.EnsureExecutable(cfg.AWSCLIPath); err != nil {
		// file may be already executable while the process does not own the file/directory
		// ref. https://github.com/aws/aws-k8s-tester/issues/66
		lg.Warn("failed to ensure executable", zap.Error(err))
		err = nil
	}

	var vo []byte

	// aws --version
	ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
	vo, err = exec.New().CommandContext(ctx, cfg.AWSCLIPath, "--version").CombinedOutput()
	cancel()
	if err != nil {
		return nil, fmt.Errorf("'aws --version' failed (output %q, error %v); required for 'aws eks update-kubeconfig'", string(vo), err)
	}
	lg.Info(
		"aws version",
		zap.String("aws-cli-path", cfg.AWSCLIPath),
		zap.String("aws-version", string(vo)),
	)

	lg.Info("mkdir", zap.String("kubectl-path-dir", filepath.Dir(cfg.KubectlPath)))
	if err = os.MkdirAll(filepath.Dir(cfg.KubectlPath), 0700); err != nil {
		return nil, fmt.Errorf("could not create %q (%v)", filepath.Dir(cfg.KubectlPath), err)
	}
	if !fileutil.Exist(cfg.KubectlPath) {
		if cfg.KubectlDownloadURL == "" {
			return nil, fmt.Errorf("%q does not exist but no download URL", cfg.KubectlPath)
		}
		cfg.KubectlPath, _ = filepath.Abs(cfg.KubectlPath)
		lg.Info("downloading kubectl", zap.String("kubectl-path", cfg.KubectlPath))
		if err = httputil.Download(lg, os.Stderr, cfg.KubectlDownloadURL, cfg.KubectlPath); err != nil {
			return nil, err
		}
	} else {
		lg.Info("skipping kubectl download; already exist", zap.String("kubectl-path", cfg.KubectlPath))
	}
	if err = fileutil.EnsureExecutable(cfg.KubectlPath); err != nil {
		// file may be already executable while the process does not own the file/directory
		// ref. https://github.com/aws/aws-k8s-tester/issues/66
		lg.Warn("failed to ensure executable", zap.Error(err))
		err = nil
	}
	// kubectl version --client=true
	ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
	vo, err = exec.New().CommandContext(ctx, cfg.KubectlPath, "version", "--client=true").CombinedOutput()
	cancel()
	if err != nil {
		return nil, fmt.Errorf("'kubectl version' failed (output %q, error %v)", string(vo), err)
	}
	lg.Info(
		"kubectl version",
		zap.String("kubectl-path", cfg.KubectlPath),
		zap.String("kubectl-version", string(vo)),
	)

	if cfg.AWSIAMAuthenticatorPath != "" && cfg.AWSIAMAuthenticatorDownloadURL != "" {
		lg.Info("mkdir", zap.String("aws-iam-authenticator-path-dir", filepath.Dir(cfg.AWSIAMAuthenticatorPath)))
		if err = os.MkdirAll(filepath.Dir(cfg.AWSIAMAuthenticatorPath), 0700); err != nil {
			return nil, fmt.Errorf("could not create %q (%v)", filepath.Dir(cfg.AWSIAMAuthenticatorPath), err)
		}
		if !fileutil.Exist(cfg.AWSIAMAuthenticatorPath) {
			cfg.AWSIAMAuthenticatorPath, _ = filepath.Abs(cfg.AWSIAMAuthenticatorPath)
			lg.Info("downloading aws-iam-authenticator", zap.String("aws-iam-authenticator-path", cfg.AWSIAMAuthenticatorPath))
			if err = os.RemoveAll(cfg.AWSIAMAuthenticatorPath); err != nil {
				return nil, err
			}
			if err = httputil.Download(lg, os.Stderr, cfg.AWSIAMAuthenticatorDownloadURL, cfg.AWSIAMAuthenticatorPath); err != nil {
				return nil, err
			}
		} else {
			lg.Info("skipping aws-iam-authenticator download; already exist", zap.String("aws-iam-authenticator-path", cfg.AWSIAMAuthenticatorPath))
		}
		if err = fileutil.EnsureExecutable(cfg.AWSIAMAuthenticatorPath); err != nil {
			// file may be already executable while the process does not own the file/directory
			// ref. https://github.com/aws/aws-k8s-tester/issues/66
			lg.Warn("failed to ensure executable", zap.Error(err))
			err = nil
		}
		// aws-iam-authenticator version
		ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
		vo, err = exec.New().CommandContext(ctx, cfg.AWSIAMAuthenticatorPath, "version").CombinedOutput()
		cancel()
		if err != nil {
			return nil, fmt.Errorf("'aws-iam-authenticator version' failed (output %q, error %v)", string(vo), err)
		}
		lg.Info(
			"aws-iam-authenticator version",
			zap.String("aws-iam-authenticator-path", cfg.AWSIAMAuthenticatorPath),
			zap.String("aws-iam-authenticator-version", string(vo)),
		)
	}

	ts = &Tester{
		color:              colorize,
		stopCreationCh:     make(chan struct{}),
		stopCreationChOnce: new(sync.Once),
		osSig:              make(chan os.Signal),
		downMu:             new(sync.Mutex),
		lg:                 lg,
		logWriter:          logWriter,
		logFile:            logFile,
		cfg:                cfg,
	}
	signal.Notify(ts.osSig, syscall.SIGTERM, syscall.SIGINT)

	defer ts.cfg.Sync()

	awsCfg := pkg_aws.Config{
		Logger:        ts.lg,
		DebugAPICalls: ts.cfg.LogLevel == "debug",
		Partition:     ts.cfg.Partition,
		Region:        ts.cfg.Region,
	}
	var stsOutput *sts.GetCallerIdentityOutput
	ts.awsSession, stsOutput, ts.cfg.Status.AWSCredentialPath, err = pkg_aws.New(&awsCfg)
	if err != nil {
		return nil, err
	}
	if stsOutput != nil {
		ts.cfg.Status.AWSAccountID = aws_v2.ToString(stsOutput.Account)
		ts.cfg.Status.AWSUserID = aws_v2.ToString(stsOutput.UserId)
		ts.cfg.Status.AWSIAMRoleARN = aws_v2.ToString(stsOutput.Arn)
	}
	ts.cfg.Sync()

	ts.lg.Info("checking AWS SDK Go v2")
	awsCfgV2, err := pkg_aws.NewV2(&awsCfg)
	if err != nil {
		return nil, err
	}
	ts.stsAPIV2 = aws_sts_v2.NewFromConfig(awsCfgV2)
	stsOutputV2, err := ts.stsAPIV2.GetCallerIdentity(
		context.Background(),
		&aws_sts_v2.GetCallerIdentityInput{},
	)
	if err != nil {
		return nil, fmt.Errorf("failed to GetCallerIdentity %v", err)
	}
	ts.lg.Info("successfully get sts caller identity using STS SDK v2",
		zap.String("partition", cfg.Partition),
		zap.String("region", cfg.Region),
		zap.String("account-id", aws_v2.ToString(stsOutputV2.Account)),
		zap.String("user-id", aws_v2.ToString(stsOutputV2.UserId)),
		zap.String("arn", aws_v2.ToString(stsOutputV2.Arn)),
	)

	ts.iamAPI = iam.New(ts.awsSession)
	ts.iamAPIV2 = aws_iam_v2.NewFromConfig(awsCfgV2)

	ts.kmsAPI = kms.New(ts.awsSession)
	ts.kmsAPIV2 = aws_kms_v2.NewFromConfig(awsCfgV2)

	ts.ssmAPI = ssm.New(ts.awsSession)
	ts.ssmAPIV2 = aws_ssm_v2.NewFromConfig(awsCfgV2)

	ts.cfnAPI = cloudformation.New(ts.awsSession)
	ts.cfnAPIV2 = aws_cfn_v2.NewFromConfig(awsCfgV2)

	ts.ec2API = ec2.New(ts.awsSession)
	if _, err = ts.ec2API.DescribeInstances(&ec2.DescribeInstancesInput{MaxResults: aws.Int64(5)}); err != nil {
		return nil, fmt.Errorf("failed to describe instances using EC2 API v1 (%v)", err)
	}
	fmt.Fprintln(ts.logWriter, "EC2 API v1 available!")

	ts.ec2APIV2 = aws_ec2_v2.NewFromConfig(awsCfgV2)
	ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
	_, err = ts.ec2APIV2.DescribeInstances(ctx, &aws_ec2_v2.DescribeInstancesInput{MaxResults: aws_v2.Int32(5)})
	cancel()
	if err != nil {
		return nil, fmt.Errorf("failed to describe instances using EC2 API v2 (%v)", err)
	}
	fmt.Fprintln(ts.logWriter, "EC2 API v2 available!")

	// endpoints package no longer exists in the AWS SDK for Go V2
	// "github.com/aws/aws-sdk-go/aws/endpoints" is deprecated...
	// the check will be done in "eks" with AWS API call
	// ref. https://aws.github.io/aws-sdk-go-v2/docs/migrating/
	fmt.Fprintln(ts.logWriter, "checking region...")
	ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
	rout, err := ts.ec2APIV2.DescribeRegions(
		ctx,
		&aws_ec2_v2.DescribeRegionsInput{
			RegionNames: []string{ts.cfg.Region},
			AllRegions:  aws_v2.Bool(false),
		},
	)
	cancel()
	if err != nil {
		return nil, fmt.Errorf("failed to describe region using EC2 API v2 (%v)", err)
	}
	if len(rout.Regions) != 1 {
		return nil, fmt.Errorf("failed to describe region using EC2 API v2 (expected 1, but got %v)", rout.Regions)
	}
	ts.lg.Info("found region",
		zap.String("region-name", aws_v2.ToString(rout.Regions[0].RegionName)),
		zap.String("endpoint", aws_v2.ToString(rout.Regions[0].Endpoint)),
		zap.String("opt-in-status", aws_v2.ToString(rout.Regions[0].OptInStatus)),
	)

	// ref. https://github.com/aws/aws-k8s-tester/pull/239
	fmt.Fprintln(ts.logWriter, "checking availability zones from default subnets...")
	ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
	dout, err := ts.ec2APIV2.DescribeSubnets(
		ctx,
		&aws_ec2_v2.DescribeSubnetsInput{
			Filters: []aws_ec2_v2_types.Filter{
				{
					Name:   aws_v2.String("default-for-az"),
					Values: []string{"true"},
				},
			},
		},
	)
	cancel()
	if err != nil {
		return nil, fmt.Errorf("failed to describe default subnets using EC2 API v2 (%v)", err)
	}
	for _, subnet := range dout.Subnets {
		ts.lg.Info("availability zones for default subnets",
			zap.String("zone-name", aws_v2.ToString(subnet.AvailabilityZone)),
			zap.String("zone-id", aws_v2.ToString(subnet.AvailabilityZoneId)),
		)
		ts.cfg.AvailabilityZoneNames = append(ts.cfg.AvailabilityZoneNames, aws_v2.ToString(subnet.AvailabilityZone))
	}
	if len(ts.cfg.AvailabilityZoneNames) == 0 {
		ts.lg.Warn("describe subnet returned empty AZ information, falling back to describe AZ API")
		ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
		aout, err := ts.ec2APIV2.DescribeAvailabilityZones(
			ctx,
			&aws_ec2_v2.DescribeAvailabilityZonesInput{
				AllAvailabilityZones: aws_v2.Bool(false),
				Filters: []aws_ec2_v2_types.Filter{
					{
						Name:   aws_v2.String("zone-type"),
						Values: []string{"availability-zone"},
					},
				},
			},
		)
		cancel()
		if err != nil {
			return nil, fmt.Errorf("failed to describe default availability zones using EC2 API v2 (%v)", err)
		}
		for _, z := range aout.AvailabilityZones {
			ts.lg.Info("availability zone from ec2:DescribeAvailabilityZones",
				zap.String("zone-name", aws_v2.ToString(z.ZoneName)),
				zap.String("zone-id", aws_v2.ToString(z.ZoneId)),
				zap.String("zone-type", aws_v2.ToString(z.ZoneType)),
				zap.String("zone-opt-in-status", fmt.Sprintf("%+v", z.OptInStatus)),
			)
			ts.cfg.AvailabilityZoneNames = append(ts.cfg.AvailabilityZoneNames, aws_v2.ToString(z.ZoneName))
		}
	}
	if len(ts.cfg.AvailabilityZoneNames) < 2 {
		return nil, fmt.Errorf("too few availability zone %v (expected at least two)", ts.cfg.AvailabilityZoneNames)
	}

	sort.Strings(ts.cfg.AvailabilityZoneNames)
	numAZLimit := len(ts.cfg.VPC.PublicSubnetCIDRs)
	if useTwoAZs[ts.cfg.Region] && numAZLimit > 2 {
		numAZLimit = 2
	}
	if len(ts.cfg.AvailabilityZoneNames) > numAZLimit {
		ts.cfg.AvailabilityZoneNames = ts.cfg.AvailabilityZoneNames[:numAZLimit]
	}
	ts.cfg.Sync()

	ts.s3API = s3.New(ts.awsSession)
	ts.s3APIV2 = aws_s3_v2.NewFromConfig(awsCfgV2)

	ts.cwAPI = cloudwatch.New(ts.awsSession)
	ts.cwAPIV2 = aws_cw_v2.NewFromConfig(awsCfgV2)

	ts.asgAPI = autoscaling.New(ts.awsSession)
	ts.asgAPIV2 = aws_asg_v2.NewFromConfig(awsCfgV2)

	ts.elbv2API = elbv2.New(ts.awsSession)
	ts.elbv2APIV2 = aws_elbv2_v2.NewFromConfig(awsCfgV2)

	ts.lg.Info("checking ECR API v1 availability; listing repositories")
	ts.ecrAPISameRegion = ecr.New(ts.awsSession, aws.NewConfig().WithRegion(ts.cfg.Region))
	var ecrResp *ecr.DescribeRepositoriesOutput
	ecrResp, err = ts.ecrAPISameRegion.DescribeRepositories(&ecr.DescribeRepositoriesInput{
		MaxResults: aws.Int64(5),
	})
	if err != nil {
		return nil, fmt.Errorf("failed to describe repositories using ECR API (%v)", err)
	}
	ts.lg.Info("listed repositories with limit 5", zap.Int("repositories", len(ecrResp.Repositories)))
	for _, v := range ecrResp.Repositories {
		ts.lg.Info("ECR repository", zap.String("repository-uri", aws_v2.ToString(v.RepositoryUri)))
	}

	ts.lg.Info("checking ECR API v2 availability; listing repositories")
	ts.ecrAPIV2 = aws_ecr_v2.NewFromConfig(awsCfgV2)
	var ecrRespV2 *aws_ecr_v2.DescribeRepositoriesOutput
	ctx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
	ecrRespV2, err = ts.ecrAPIV2.DescribeRepositories(ctx, &aws_ecr_v2.DescribeRepositoriesInput{
		MaxResults: aws.Int32(5),
	})
	cancel()
	if err != nil {
		return nil, fmt.Errorf("failed to describe repositories using ECR API (%v)", err)
	}
	ts.lg.Info("listed repositories with limit 5", zap.Int("repositories", len(ecrRespV2.Repositories)))
	for _, v := range ecrRespV2.Repositories {
		ts.lg.Info("ECR repository", zap.String("repository-uri", aws_v2.ToString(v.RepositoryUri)))
	}

	// create a separate session for EKS (for resolver endpoint)
	var eksSessionForCluster *session.Session
	eksSessionForCluster, _, ts.cfg.Status.AWSCredentialPath, err = pkg_aws.New(&pkg_aws.Config{
		Logger:        ts.lg,
		DebugAPICalls: ts.cfg.LogLevel == "debug",
		Partition:     ts.cfg.Partition,
		Region:        ts.cfg.Region,
		ResolverURL:   ts.cfg.ResolverURL,
		SigningName:   ts.cfg.SigningName,
	})
	if err != nil {
		return nil, err
	}
	ts.eksAPIForCluster = aws_eks.New(eksSessionForCluster)

	awsCfgV2EKS, err := pkg_aws.NewV2(&pkg_aws.Config{
		Logger:        ts.lg,
		DebugAPICalls: ts.cfg.LogLevel == "debug",
		Partition:     ts.cfg.Partition,
		Region:        ts.cfg.Region,
		ResolverURL:   ts.cfg.ResolverURL,
		SigningName:   ts.cfg.SigningName,
	})
	if err != nil {
		return nil, err
	}
	ts.eksAPIForClusterV2 = aws_eks_v2.NewFromConfig(awsCfgV2EKS)

	if ts.cfg.IsEnabledAddOnManagedNodeGroups() {
		var eksSessionForMNG *session.Session
		eksSessionForMNG, _, ts.cfg.Status.AWSCredentialPath, err = pkg_aws.New(&pkg_aws.Config{
			Logger:        ts.lg,
			DebugAPICalls: ts.cfg.LogLevel == "debug",
			Partition:     ts.cfg.Partition,
			Region:        ts.cfg.Region,
			ResolverURL:   ts.cfg.AddOnManagedNodeGroups.ResolverURL,
			SigningName:   ts.cfg.AddOnManagedNodeGroups.SigningName,
		})
		if err != nil {
			return nil, err
		}
		ts.eksAPIForMNG = aws_eks.New(eksSessionForMNG)

		awsCfgV2EKS, err := pkg_aws.NewV2(&pkg_aws.Config{
			Logger:        ts.lg,
			DebugAPICalls: ts.cfg.LogLevel == "debug",
			Partition:     ts.cfg.Partition,
			Region:        ts.cfg.Region,
			ResolverURL:   ts.cfg.AddOnManagedNodeGroups.ResolverURL,
			SigningName:   ts.cfg.AddOnManagedNodeGroups.SigningName,
		})
		if err != nil {
			return nil, err
		}
		ts.eksAPIForMNGV2 = aws_eks_v2.NewFromConfig(awsCfgV2EKS)
	}

	ts.lg.Info("checking EKS API v1 availability; listing clusters")
	var eksListResp *aws_eks.ListClustersOutput
	eksListResp, err = ts.eksAPIForCluster.ListClusters(&aws_eks.ListClustersInput{
		MaxResults: aws.Int64(20),
	})
	if err != nil {
		return nil, fmt.Errorf("failed to list clusters using EKS API v1 (%v)", err)
	}
	ts.lg.Info("listed clusters with limit 20 with v1", zap.Int("clusters", len(eksListResp.Clusters)))
	for _, v := range eksListResp.Clusters {
		ts.lg.Info("EKS cluster", zap.String("name", aws_v2.ToString(v)))
	}

	ts.lg.Info("checking EKS API v2 availability; listing clusters")
	var eksListRespV2 *aws_eks_v2.ListClustersOutput
	cctx, ccancel := context.WithTimeout(context.Background(), 10*time.Second)
	eksListRespV2, err = ts.eksAPIForClusterV2.ListClusters(
		cctx,
		&aws_eks_v2.ListClustersInput{
			MaxResults: aws.Int32(20),
		},
	)
	ccancel()
	if err != nil {
		ts.lg.Warn("failed to list clusters using EKS API v2", zap.Error(err))
		// return nil, fmt.Errorf("failed to list clusters using EKS API v2 (%v)", err)
	} else {
		ts.lg.Info("listed clusters with limit 20 with v2", zap.Int("clusters", len(eksListResp.Clusters)))
		for _, v := range eksListRespV2.Clusters {
			ts.lg.Info("EKS cluster", zap.String("name", v))
		}
	}

	// update k8s client if cluster has already been created
	ts.lg.Info("creating k8s client from previous states if any")
	kcfg := &k8s_client.EKSConfig{
		Logger:                             ts.lg,
		Region:                             ts.cfg.Region,
		ClusterName:                        ts.cfg.Name,
		KubeConfigPath:                     ts.cfg.KubeConfigPath,
		KubectlPath:                        ts.cfg.KubectlPath,
		ServerVersion:                      ts.cfg.Version,
		EncryptionEnabled:                  ts.cfg.Encryption.CMKARN != "",
		S3API:                              ts.s3API,
		S3BucketName:                       ts.cfg.S3.BucketName,
		S3MetricsRawOutputDirKubeAPIServer: path.Join(ts.cfg.Name, "metrics-kube-apiserver"),
		MetricsRawOutputDirKubeAPIServer:   filepath.Join(filepath.Dir(ts.cfg.ConfigPath), ts.cfg.Name+"-metrics-kube-apiserver"),
		Clients:                            ts.cfg.Clients,
		ClientQPS:                          ts.cfg.ClientQPS,
		ClientBurst:                        ts.cfg.ClientBurst,
		ClientTimeout:                      ts.cfg.ClientTimeout,
	}
	if ts.cfg.IsEnabledAddOnClusterVersionUpgrade() {
		kcfg.UpgradeServerVersion = ts.cfg.AddOnClusterVersionUpgrade.Version
	}
	if ts.cfg.Status != nil {
		kcfg.ClusterAPIServerEndpoint = ts.cfg.Status.ClusterAPIServerEndpoint
		kcfg.ClusterCADecoded = ts.cfg.Status.ClusterCADecoded
	}
	// in case cluster has already been created
	ts.k8sClient, err = k8s_client.NewEKS(kcfg)
	if err != nil {
		ts.lg.Warn("failed to create k8s client from previous states", zap.Error(err))
	} else {
		ts.lg.Info("created k8s client from previous states")
		// call here, because "createCluster" won't be called
		// if loaded from previous states
		// e.g. delete
		if err = ts.createTesters(); err != nil {
			return nil, err
		}
	}

	return ts, nil
}