pkg/provider/eks/eks.go (465 lines of code) (raw):
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package eks
import (
"context"
"encoding/base64"
"fmt"
"log"
"os"
"regexp"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials"
awsSession "github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/eks"
"github.com/pkg/errors"
"gopkg.in/alecthomas/kingpin.v2"
yamlGo "gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
awsToken "sigs.k8s.io/aws-iam-authenticator/pkg/token"
"github.com/prometheus/test-infra/pkg/provider"
k8sProvider "github.com/prometheus/test-infra/pkg/provider/k8s"
)
type Resource = provider.Resource
type eksCluster struct {
Cluster eks.CreateClusterInput
NodeGroups []eks.CreateNodegroupInput
}
// EKS holds the fields used to generate an API request.
type EKS struct {
Auth string
ClusterName string
// The eks client used when performing EKS requests.
clientEKS *eks.EKS
// The aws session used in abstraction of aws credentials.
sessionAWS *awsSession.Session
// The k8s provider used when we work with the manifest files.
k8sProvider *k8sProvider.K8s
// Final DeploymentFiles files.
DeploymentFiles []string
// Final DeploymentVars.
DeploymentVars map[string]string
// DeployResource to construct DeploymentVars and DeploymentFiles
DeploymentResource *provider.DeploymentResource
// Content bytes after parsing the template variables, grouped by filename.
eksResources []Resource
// K8s resource.runtime objects after parsing the template variables, grouped by filename.
k8sResources []k8sProvider.Resource
ctx context.Context
}
// New is the EKS constructor
func New(dr *provider.DeploymentResource) *EKS {
eks := &EKS{
DeploymentResource: dr,
}
return eks
}
// NewEKSClient sets the EKS client used when performing the GKE requests.
func (c *EKS) NewEKSClient(*kingpin.ParseContext) error {
if c.Auth != "" {
} else if c.Auth = os.Getenv("AWS_APPLICATION_CREDENTIALS"); c.Auth == "" {
return errors.Errorf("no auth provided set the auth flag or the AWS_APPLICATION_CREDENTIALS env variable")
}
// When the auth variable points to a file
// put the file content in the variable.
if content, err := os.ReadFile(c.Auth); err == nil {
c.Auth = string(content)
}
// Check if auth data is base64 encoded and decode it.
encoded, err := regexp.MatchString("^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$", c.Auth)
if err != nil {
return err
}
if encoded {
auth, err := base64.StdEncoding.DecodeString(c.Auth)
if err != nil {
return errors.Wrap(err, "could not decode auth data")
}
c.Auth = string(auth)
}
credValue := &credentials.Value{}
if err = yamlGo.UnmarshalStrict([]byte(c.Auth), credValue); err != nil {
return errors.Wrap(err, "could not get credential values")
}
awsSess := awsSession.Must(awsSession.NewSession(&aws.Config{
Credentials: credentials.NewStaticCredentialsFromCreds(*credValue),
Region: aws.String(c.DeploymentVars["ZONE"]),
}))
c.sessionAWS = awsSess
c.clientEKS = eks.New(awsSess)
c.ctx = context.Background()
return nil
}
// checkDeploymentVarsAndFiles checks whether the requied deployment vars are passed.
func (c *EKS) checkDeploymentVarsAndFiles() error {
reqDepVars := []string{"ZONE", "CLUSTER_NAME"}
for _, k := range reqDepVars {
if v := c.DeploymentVars[k]; v == "" {
return fmt.Errorf("missing required %v variable", k)
}
}
if len(c.DeploymentFiles) == 0 {
return fmt.Errorf("missing deployment file(s)")
}
return nil
}
// SetupDeploymentResources Sets up DeploymentVars and DeploymentFiles
func (c *EKS) SetupDeploymentResources(*kingpin.ParseContext) error {
c.DeploymentFiles = c.DeploymentResource.DeploymentFiles
c.DeploymentVars = provider.MergeDeploymentVars(
c.DeploymentResource.DefaultDeploymentVars,
c.DeploymentResource.FlagDeploymentVars,
)
return nil
}
// EKSDeploymentParse parses the cluster/nodegroups deployment file and saves the result as bytes grouped by the filename.
// Any variables passed to the cli will be replaced in the resource files following the golang text template format.
func (c *EKS) EKSDeploymentParse(*kingpin.ParseContext) error {
if err := c.checkDeploymentVarsAndFiles(); err != nil {
return err
}
deploymentResource, err := provider.DeploymentsParse(c.DeploymentFiles, c.DeploymentVars)
if err != nil {
return fmt.Errorf("Couldn't parse deployment files: %v", err)
}
c.eksResources = deploymentResource
return nil
}
// K8SDeploymentsParse parses the k8s objects deployment files and saves the result as k8s objects grouped by the filename.
// Any variables passed to the cli will be replaced in the resources files following the golang text template format.
func (c *EKS) K8SDeploymentsParse(*kingpin.ParseContext) error {
if err := c.checkDeploymentVarsAndFiles(); err != nil {
return err
}
deploymentResource, err := provider.DeploymentsParse(c.DeploymentFiles, c.DeploymentVars)
if err != nil {
return fmt.Errorf("Couldn't parse deployment files: %v", err)
}
for _, deployment := range deploymentResource {
decode := scheme.Codecs.UniversalDeserializer().Decode
k8sObjects := make([]runtime.Object, 0)
for _, text := range strings.Split(string(deployment.Content), provider.Separator) {
text = strings.TrimSpace(text)
if len(text) == 0 {
continue
}
resource, _, err := decode([]byte(text), nil, nil)
if err != nil {
return errors.Wrapf(err, "decoding the resource file:%v, section:%v...", deployment.FileName, text[:100])
}
if resource == nil {
continue
}
k8sObjects = append(k8sObjects, resource)
}
if len(k8sObjects) > 0 {
c.k8sResources = append(c.k8sResources, k8sProvider.Resource{FileName: deployment.FileName, Objects: k8sObjects})
}
}
return nil
}
// ClusterCreate create a new cluster or applies changes to an existing cluster.
func (c *EKS) ClusterCreate(*kingpin.ParseContext) error {
req := &eksCluster{}
for _, deployment := range c.eksResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, req); err != nil {
return fmt.Errorf("Error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
log.Printf("Cluster create request: name:'%s'", *req.Cluster.Name)
_, err := c.clientEKS.CreateCluster(&req.Cluster)
if err != nil {
return fmt.Errorf("Couldn't create cluster '%v', file:%v ,err: %v", *req.Cluster.Name, deployment.FileName, err)
}
err = provider.RetryUntilTrue(
fmt.Sprintf("creating cluster:%v", *req.Cluster.Name),
provider.EKSRetryCount,
func() (bool, error) { return c.clusterRunning(*req.Cluster.Name) },
)
if err != nil {
return fmt.Errorf("creating cluster err:%v", err)
}
for _, nodegroupReq := range req.NodeGroups {
nodegroupReq.ClusterName = req.Cluster.Name
log.Printf("Nodegroup create request: NodeGroupName: '%s', ClusterName: '%s'", *nodegroupReq.NodegroupName, *req.Cluster.Name)
_, err := c.clientEKS.CreateNodegroup(&nodegroupReq)
if err != nil {
return fmt.Errorf("Couldn't create nodegroup '%v' for cluster '%v, file:%v ,err: %v", nodegroupReq.NodegroupName, req.Cluster.Name, deployment.FileName, err)
}
err = provider.RetryUntilTrue(
fmt.Sprintf("creating nodegroup:%s for cluster:%s", *nodegroupReq.NodegroupName, *req.Cluster.Name),
provider.EKSRetryCount,
func() (bool, error) { return c.nodeGroupCreated(*nodegroupReq.NodegroupName, *req.Cluster.Name) },
)
if err != nil {
return fmt.Errorf("creating nodegroup err:%v", err)
}
}
}
return nil
}
// ClusterDelete deletes a eks Cluster
func (c *EKS) ClusterDelete(*kingpin.ParseContext) error {
req := &eksCluster{}
for _, deployment := range c.eksResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, req); err != nil {
return fmt.Errorf("Error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
// To delete a cluster we have to manually delete all cluster
log.Printf("Removing all nodepools for '%s'", *req.Cluster.Name)
// Listing all nodepools for cluster
reqL := &eks.ListNodegroupsInput{
ClusterName: req.Cluster.Name,
}
for {
resL, err := c.clientEKS.ListNodegroups(reqL)
if err != nil {
return fmt.Errorf("listing nodepools err:%v", err)
}
for _, nodegroup := range resL.Nodegroups {
log.Printf("Removing nodepool '%s' in cluster '%s'", *nodegroup, *req.Cluster.Name)
reqD := eks.DeleteNodegroupInput{
ClusterName: req.Cluster.Name,
NodegroupName: nodegroup,
}
_, err := c.clientEKS.DeleteNodegroup(&reqD)
if err != nil {
return fmt.Errorf("Couldn't create nodegroup '%v' for cluster '%v ,err: %v", *nodegroup, req.Cluster.Name, err)
}
err = provider.RetryUntilTrue(
fmt.Sprintf("deleting nodegroup:%v for cluster:%v", *nodegroup, *req.Cluster.Name),
provider.GlobalRetryCount,
func() (bool, error) { return c.nodeGroupDeleted(*nodegroup, *req.Cluster.Name) },
)
if err != nil {
return fmt.Errorf("deleting nodegroup err:%v", err)
}
}
if resL.NextToken == nil {
break
} else {
reqL.NextToken = resL.NextToken
}
}
reqD := &eks.DeleteClusterInput{
Name: req.Cluster.Name,
}
log.Printf("Removing cluster '%v'", *reqD.Name)
_, err := c.clientEKS.DeleteCluster(reqD)
if err != nil {
return fmt.Errorf("Couldn't delete cluster '%v', file:%v ,err: %v", *req.Cluster.Name, deployment.FileName, err)
}
err = provider.RetryUntilTrue(
fmt.Sprintf("deleting cluster:%v", *reqD.Name),
provider.GlobalRetryCount,
func() (bool, error) { return c.clusterDeleted(*reqD.Name) })
if err != nil {
return fmt.Errorf("removing cluster err:%v", err)
}
}
return nil
}
// clusterRunning checks whether a cluster is in a active state.
func (c *EKS) clusterRunning(name string) (bool, error) {
req := &eks.DescribeClusterInput{
Name: aws.String(name),
}
clusterRes, err := c.clientEKS.DescribeCluster(req)
if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == eks.ErrCodeNotFoundException {
return false, nil
}
return false, fmt.Errorf("Couldn't get cluster status: %v", err)
}
if *clusterRes.Cluster.Status == eks.ClusterStatusFailed {
return false, fmt.Errorf("Cluster not in a status to become ready - %s", *clusterRes.Cluster.Status)
}
if *clusterRes.Cluster.Status == eks.ClusterStatusActive {
return true, nil
}
log.Printf("Cluster '%v' status: %v", name, *clusterRes.Cluster.Status)
return false, nil
}
func (c *EKS) clusterDeleted(name string) (bool, error) {
req := &eks.DescribeClusterInput{
Name: aws.String(name),
}
clusterRes, err := c.clientEKS.DescribeCluster(req)
if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == eks.ErrCodeResourceNotFoundException {
return true, nil
}
return false, fmt.Errorf("Couldn't get cluster status: %v", err)
}
log.Printf("Cluster '%v' status: %v", name, *clusterRes.Cluster.Status)
return false, nil
}
// NodeGroupCreate creates a new k8s nodegroup in an existing cluster.
func (c *EKS) NodeGroupCreate(*kingpin.ParseContext) error {
req := &eksCluster{}
for _, deployment := range c.eksResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, req); err != nil {
return fmt.Errorf("Error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
for _, nodegroupReq := range req.NodeGroups {
nodegroupReq.ClusterName = req.Cluster.Name
log.Printf("Nodegroup create request: NodeGroupName: '%s', ClusterName: '%s'", *nodegroupReq.NodegroupName, *req.Cluster.Name)
_, err := c.clientEKS.CreateNodegroup(&nodegroupReq)
if err != nil {
return fmt.Errorf("Couldn't create nodegroup '%s' for cluster '%s', file:%v ,err: %v", *nodegroupReq.NodegroupName, *req.Cluster.Name, deployment.FileName, err)
}
err = provider.RetryUntilTrue(
fmt.Sprintf("creating nodegroup:%s for cluster:%s", *nodegroupReq.NodegroupName, *req.Cluster.Name),
provider.GlobalRetryCount,
func() (bool, error) { return c.nodeGroupCreated(*nodegroupReq.NodegroupName, *req.Cluster.Name) },
)
if err != nil {
return fmt.Errorf("creating nodegroup err:%v", err)
}
}
}
return nil
}
// NodeGroupDelete deletes a k8s nodegroup in an existing cluster
func (c *EKS) NodeGroupDelete(*kingpin.ParseContext) error {
req := &eksCluster{}
for _, deployment := range c.eksResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, req); err != nil {
return fmt.Errorf("Error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
for _, nodegroupReq := range req.NodeGroups {
nodegroupReq.ClusterName = req.Cluster.Name
log.Printf("Nodegroup delete request: NodeGroupName: '%s', ClusterName: '%s'", *nodegroupReq.NodegroupName, *req.Cluster.Name)
reqD := eks.DeleteNodegroupInput{
ClusterName: req.Cluster.Name,
NodegroupName: nodegroupReq.NodegroupName,
}
_, err := c.clientEKS.DeleteNodegroup(&reqD)
if err != nil {
return fmt.Errorf("Couldn't delete nodegroup '%s' for cluster '%s, file:%v ,err: %v", *nodegroupReq.NodegroupName, *req.Cluster.Name, deployment.FileName, err)
}
err = provider.RetryUntilTrue(
fmt.Sprintf("deleting nodegroup:%s for cluster:%s", *nodegroupReq.NodegroupName, *req.Cluster.Name),
provider.GlobalRetryCount,
func() (bool, error) { return c.nodeGroupDeleted(*nodegroupReq.NodegroupName, *req.Cluster.Name) },
)
if err != nil {
return fmt.Errorf("deleting nodegroup err:%v", err)
}
}
}
return nil
}
func (c *EKS) nodeGroupCreated(nodegroupName, clusterName string) (bool, error) {
req := &eks.DescribeNodegroupInput{
ClusterName: aws.String(clusterName),
NodegroupName: aws.String(nodegroupName),
}
nodegroupRes, err := c.clientEKS.DescribeNodegroup(req)
if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == eks.ErrCodeNotFoundException {
return false, nil
}
return false, fmt.Errorf("Couldn't get nodegroupname status: %v", err)
}
if *nodegroupRes.Nodegroup.Status == eks.NodegroupStatusActive {
return true, nil
}
log.Printf("Nodegroup '%v' for Cluster '%v' status: %v", nodegroupName, clusterName, *nodegroupRes.Nodegroup.Status)
return false, nil
}
func (c *EKS) nodeGroupDeleted(nodegroupName, clusterName string) (bool, error) {
req := &eks.DescribeNodegroupInput{
ClusterName: aws.String(clusterName),
NodegroupName: aws.String(nodegroupName),
}
nodegroupRes, err := c.clientEKS.DescribeNodegroup(req)
if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == eks.ErrCodeResourceNotFoundException {
return true, nil
}
return false, fmt.Errorf("Couldn't get nodegroupname status: %v", err)
}
log.Printf("Nodegroup '%v' for Cluster '%v' status: %v", nodegroupName, clusterName, *nodegroupRes.Nodegroup.Status)
return false, nil
}
// AllNodeGroupsRunning returns an error if at least one node pool is not running
func (c *EKS) AllNodeGroupsRunning(*kingpin.ParseContext) error {
req := &eksCluster{}
for _, deployment := range c.eksResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, req); err != nil {
return fmt.Errorf("Error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
for _, nodegroup := range req.NodeGroups {
isRunning, err := c.nodeGroupCreated(*nodegroup.NodegroupName, *req.Cluster.Name)
if err != nil {
return fmt.Errorf("error fetching nodegroup info")
}
if !isRunning {
return fmt.Errorf("nodepool not running name: %v", *nodegroup.NodegroupName)
}
}
}
return nil
}
// AllNodeGroupsDeleted returns an error if at least one node pool is not deleted
func (c *EKS) AllNodeGroupsDeleted(*kingpin.ParseContext) error {
req := &eksCluster{}
for _, deployment := range c.eksResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, req); err != nil {
return fmt.Errorf("Error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
for _, nodegroup := range req.NodeGroups {
isRunning, err := c.nodeGroupDeleted(*nodegroup.NodegroupName, *req.Cluster.Name)
if err != nil {
return fmt.Errorf("error fetching nodegroup info")
}
if !isRunning {
return fmt.Errorf("nodepool not running name: %v", *nodegroup.NodegroupName)
}
}
}
return nil
}
// EKSK8sToken returns aws iam authenticator token which is used to access eks k8s cluster from outside.
func (c *EKS) EKSK8sToken(clusterName, region string) awsToken.Token {
gen, err := awsToken.NewGenerator(true, false)
if err != nil {
log.Fatalf("Token abstraction error: %v", err)
}
opts := &awsToken.GetTokenOptions{
ClusterID: clusterName,
Session: c.sessionAWS,
}
tok, err := gen.GetWithOptions(opts)
if err != nil {
log.Fatalf("Token abstraction error: %v", err)
}
return tok
}
// NewK8sProvider sets the k8s provider used for deploying k8s manifests
func (c *EKS) NewK8sProvider(*kingpin.ParseContext) error {
clusterName := c.DeploymentVars["CLUSTER_NAME"]
region := c.DeploymentVars["ZONE"]
req := &eks.DescribeClusterInput{
Name: &clusterName,
}
rep, err := c.clientEKS.DescribeCluster(req)
if err != nil {
return fmt.Errorf("failed to get cluster details: %v", err)
}
arnRole := *rep.Cluster.Arn
caCert, err := base64.StdEncoding.DecodeString(*rep.Cluster.CertificateAuthority.Data)
if err != nil {
return fmt.Errorf("failed to decode certificate: %v", err.Error())
}
cluster := clientcmdapi.NewCluster()
cluster.CertificateAuthorityData = []byte(caCert)
cluster.Server = *rep.Cluster.Endpoint
clusterContext := clientcmdapi.NewContext()
clusterContext.Cluster = arnRole
clusterContext.AuthInfo = arnRole
authInfo := clientcmdapi.NewAuthInfo()
authInfo.Token = c.EKSK8sToken(clusterName, region).Token
config := clientcmdapi.NewConfig()
config.AuthInfos[arnRole] = authInfo
config.Contexts[arnRole] = clusterContext
config.Clusters[arnRole] = cluster
config.CurrentContext = arnRole
config.Kind = "Config"
config.APIVersion = "v1"
c.k8sProvider, err = k8sProvider.New(c.ctx, config)
if err != nil {
return fmt.Errorf("k8s provider error %v", err)
}
return nil
}
// ResourceApply calls k8s.ResourceApply to apply the k8s objects in the manifest files.
func (c *EKS) ResourceApply(*kingpin.ParseContext) error {
if err := c.k8sProvider.ResourceApply(c.k8sResources); err != nil {
return fmt.Errorf("error while applying a resource err: %v", err)
}
return nil
}
// ResourceDelete calls k8s.ResourceDelete to apply the k8s objects in the manifest files.
func (c *EKS) ResourceDelete(*kingpin.ParseContext) error {
if err := c.k8sProvider.ResourceDelete(c.k8sResources); err != nil {
return fmt.Errorf("error while deleting objects from a manifest file err: %v", err)
}
return nil
}
// GetDeploymentVars shows deployment variables.
func (c *EKS) GetDeploymentVars(*kingpin.ParseContext) error {
fmt.Print("-------------------\n DeploymentVars \n------------------- \n")
for key, value := range c.DeploymentVars {
fmt.Println(key, " : ", value)
}
return nil
}