pkg/provider/gke/gke.go (444 lines of code) (raw):
// Copyright 2019 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package gke
import (
"context"
"encoding/base64"
"fmt"
"log"
"os"
"regexp"
"strings"
gke "cloud.google.com/go/container/apiv1"
"cloud.google.com/go/container/apiv1/containerpb"
"github.com/pkg/errors"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/alecthomas/kingpin.v2"
yamlGo "gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"github.com/prometheus/test-infra/pkg/provider"
k8sProvider "github.com/prometheus/test-infra/pkg/provider/k8s"
)
// New is the GKE constructor.
func New(dr *provider.DeploymentResource) *GKE {
return &GKE{
DeploymentResource: dr,
}
}
type Resource = provider.Resource
// GKE holds the fields used to generate an API request.
type GKE struct {
// The auth used to authenticate the cli.
// Can be a file path or an env variable that includes the json data.
Auth string
// The project id for all requests.
ProjectID string
// The gke client used when performing GKE requests.
clientGKE *gke.ClusterManagerClient
// The k8s provider used when we work with the manifest files.
k8sProvider *k8sProvider.K8s
// Final DeploymentFiles files.
DeploymentFiles []string
// Final DeploymentVars.
DeploymentVars map[string]string
// DeployResource to construct DeploymentVars and DeploymentFiles
DeploymentResource *provider.DeploymentResource
// Content bytes after parsing the template variables, grouped by filename.
gkeResources []Resource
// K8s resource.runtime objects after parsing the template variables, grouped by filename.
k8sResources []k8sProvider.Resource
ctx context.Context
}
// NewGKEClient sets the GKE client used when performing GKE requests.
func (c *GKE) NewGKEClient(*kingpin.ParseContext) error {
// Set the auth env variable needed to the gke client.
if c.Auth != "" {
} else if c.Auth = os.Getenv("GOOGLE_APPLICATION_CREDENTIALS"); c.Auth == "" {
return errors.Errorf("no auth provided! Need to either set the auth flag or the GOOGLE_APPLICATION_CREDENTIALS env variable")
}
// When the auth variable points to a file
// put the file content in the variable.
if content, err := os.ReadFile(c.Auth); err == nil {
c.Auth = string(content)
}
// Check if auth data is base64 encoded and decode it.
encoded, err := regexp.MatchString("^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$", c.Auth)
if err != nil {
return err
}
if encoded {
auth, err := base64.StdEncoding.DecodeString(c.Auth)
if err != nil {
return errors.Wrap(err, "could not decode auth data")
}
c.Auth = string(auth)
}
// Create temporary file to store the credentials.
saFile, err := os.CreateTemp("", "service-account")
if err != nil {
return errors.Wrap(err, "could not create temp file")
}
defer saFile.Close()
if _, err := saFile.Write([]byte(c.Auth)); err != nil {
return errors.Wrap(err, "could not write to temp file")
}
// Set the auth env variable needed to the k8s client.
// The client looks for this special variable name and it is the only way to set the auth for now.
// TODO: Remove when the client supports an auth config option in NewDefaultClientConfig.
// https://github.com/kubernetes/kubernetes/pull/80303
os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", saFile.Name())
opts := option.WithCredentialsJSON([]byte(c.Auth))
cl, err := gke.NewClusterManagerClient(context.Background(), opts)
if err != nil {
return errors.Wrap(err, "could not create the gke client")
}
c.clientGKE = cl
c.ctx = context.Background()
return nil
}
// SetupDeploymentResources Sets up DeploymentVars and DeploymentFiles
func (c *GKE) SetupDeploymentResources(*kingpin.ParseContext) error {
c.DeploymentFiles = c.DeploymentResource.DeploymentFiles
c.DeploymentVars = provider.MergeDeploymentVars(
c.DeploymentResource.DefaultDeploymentVars,
c.DeploymentResource.FlagDeploymentVars,
)
return nil
}
// GKEDeploymentsParse parses the cluster/nodepool deployment files and saves the result as bytes grouped by the filename.
// Any variables passed to the cli will be replaced in the resources files following the golang text template format.
func (c *GKE) GKEDeploymentsParse(*kingpin.ParseContext) error {
if err := c.checkDeploymentVarsAndFiles(); err != nil {
return err
}
deploymentResource, err := provider.DeploymentsParse(c.DeploymentFiles, c.DeploymentVars)
if err != nil {
log.Fatalf("Couldn't parse deployment files: %v", err)
}
c.gkeResources = deploymentResource
return nil
}
// K8SDeploymentsParse parses the k8s objects deployment files and saves the result as k8s objects grouped by the filename.
// Any variables passed to the cli will be replaced in the resources files following the golang text template format.
func (c *GKE) K8SDeploymentsParse(*kingpin.ParseContext) error {
if err := c.checkDeploymentVarsAndFiles(); err != nil {
return err
}
deploymentResource, err := provider.DeploymentsParse(c.DeploymentFiles, c.DeploymentVars)
if err != nil {
log.Fatalf("Couldn't parse deployment files: %v", err)
}
for _, deployment := range deploymentResource {
decode := scheme.Codecs.UniversalDeserializer().Decode
k8sObjects := make([]runtime.Object, 0)
for _, text := range strings.Split(string(deployment.Content), provider.Separator) {
text = strings.TrimSpace(text)
if len(text) == 0 {
continue
}
resource, _, err := decode([]byte(text), nil, nil)
if err != nil {
return errors.Wrapf(err, "decoding the resource file:%v, section:%v...", deployment.FileName, text[:100])
}
if resource == nil {
continue
}
k8sObjects = append(k8sObjects, resource)
}
if len(k8sObjects) > 0 {
c.k8sResources = append(c.k8sResources, k8sProvider.Resource{FileName: deployment.FileName, Objects: k8sObjects})
}
}
return nil
}
// checkDeploymentVarsAndFiles checks whether the requied deployment vars are passed.
func (c *GKE) checkDeploymentVarsAndFiles() error {
reqDepVars := []string{"GKE_PROJECT_ID", "ZONE", "CLUSTER_NAME"}
for _, k := range reqDepVars {
if v, ok := c.DeploymentVars[k]; !ok || v == "" {
return fmt.Errorf("missing required %v variable", k)
}
}
if len(c.DeploymentFiles) == 0 {
return fmt.Errorf("missing deployment file(s)")
}
return nil
}
// ClusterCreate create a new cluster or applies changes to an existing cluster.
func (c *GKE) ClusterCreate(*kingpin.ParseContext) error {
req := &containerpb.CreateClusterRequest{}
for _, deployment := range c.gkeResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, req); err != nil {
log.Fatalf("Error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
log.Printf("Cluster create request: name:'%v', project `%s`,zone `%s`", req.Cluster.Name, req.ProjectId, req.Zone)
_, err := c.clientGKE.CreateCluster(c.ctx, req)
if err != nil {
log.Fatalf("Couldn't create cluster '%v', file:%v ,err: %v", req.Cluster.Name, deployment.FileName, err)
}
err = provider.RetryUntilTrue(
fmt.Sprintf("creating cluster:%v", req.Cluster.Name),
provider.GlobalRetryCount,
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
func() (bool, error) { return c.clusterRunning(req.Zone, req.ProjectId, req.Cluster.Name) })
if err != nil {
log.Fatalf("creating cluster err:%v", err)
}
}
return nil
}
// ClusterDelete deletes a k8s cluster.
func (c *GKE) ClusterDelete(*kingpin.ParseContext) error {
// Use CreateClusterRequest struct to pass the UnmarshalStrict validation and
// than use the result to create the DeleteClusterRequest
reqC := &containerpb.CreateClusterRequest{}
for _, deployment := range c.gkeResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, reqC); err != nil {
log.Fatalf("Error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
reqD := &containerpb.DeleteClusterRequest{
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
ProjectId: reqC.ProjectId,
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
Zone: reqC.Zone,
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
ClusterId: reqC.Cluster.Name,
}
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
log.Printf("Removing cluster '%v', project '%v', zone '%v'", reqD.ClusterId, reqD.ProjectId, reqD.Zone)
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
err := provider.RetryUntilTrue(
fmt.Sprintf("deleting cluster:%v", reqD.ClusterId),
provider.GlobalRetryCount,
func() (bool, error) { return c.clusterDeleted(reqD) })
if err != nil {
log.Fatalf("removing cluster err:%v", err)
}
}
return nil
}
// clusterDeleted checks whether a cluster has been deleted.
func (c *GKE) clusterDeleted(req *containerpb.DeleteClusterRequest) (bool, error) {
rep, err := c.clientGKE.DeleteCluster(c.ctx, req)
if err != nil {
st, ok := status.FromError(err)
if !ok {
return false, fmt.Errorf("unknown reply status error %v", err)
}
if st.Code() == codes.NotFound {
return true, nil
}
if st.Code() == codes.FailedPrecondition {
log.Printf("Cluster in 'FailedPrecondition' state '%s'", err)
return false, nil
}
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
return false, errors.Wrapf(err, "deleting cluster:%v", req.ClusterId)
}
log.Printf("cluster status: `%v`", rep.Status)
return false, nil
}
// clusterRunning checks whether a cluster is in a running state.
func (c *GKE) clusterRunning(zone, projectID, clusterID string) (bool, error) {
req := &containerpb.GetClusterRequest{
ProjectId: projectID,
Zone: zone,
ClusterId: clusterID,
}
cluster, err := c.clientGKE.GetCluster(c.ctx, req)
if err != nil {
// We don't consider none existing cluster error a failure. So don't return an error here.
if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
return false, nil
}
return false, fmt.Errorf("Couldn't get cluster status:%v", err)
}
if cluster.Status == containerpb.Cluster_ERROR ||
cluster.Status == containerpb.Cluster_STATUS_UNSPECIFIED ||
cluster.Status == containerpb.Cluster_STOPPING {
return false, fmt.Errorf("Cluster not in a status to become ready - %s", cluster.Status)
}
if cluster.Status == containerpb.Cluster_RUNNING {
return true, nil
}
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
log.Printf("Cluster '%v' status:%v , %v", projectID, cluster.Status, cluster.StatusMessage)
return false, nil
}
// NodePoolCreate creates a new k8s node-pool in an existing cluster.
func (c *GKE) NodePoolCreate(*kingpin.ParseContext) error {
reqC := &containerpb.CreateClusterRequest{}
for _, deployment := range c.gkeResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, reqC); err != nil {
log.Fatalf("Error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
for _, node := range reqC.Cluster.NodePools {
reqN := &containerpb.CreateNodePoolRequest{
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
ProjectId: reqC.ProjectId,
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
Zone: reqC.Zone,
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
ClusterId: reqC.Cluster.Name,
NodePool: node,
}
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
log.Printf("Cluster nodepool create request: cluster '%v', nodepool '%v' , project `%s`,zone `%s`", reqN.ClusterId, reqN.NodePool.Name, reqN.ProjectId, reqN.Zone)
err := provider.RetryUntilTrue(
fmt.Sprintf("nodepool creation:%v", reqN.NodePool.Name),
provider.GlobalRetryCount,
func() (bool, error) {
return c.nodePoolCreated(reqN)
})
if err != nil {
log.Fatalf("Couldn't create cluster nodepool '%v', file:%v ,err: %v", node.Name, deployment.FileName, err)
}
err = provider.RetryUntilTrue(
fmt.Sprintf("checking nodepool running status for:%v", reqN.NodePool.Name),
provider.GlobalRetryCount,
func() (bool, error) {
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
return c.nodePoolRunning(reqN.Zone, reqN.ProjectId, reqN.ClusterId, reqN.NodePool.Name)
})
if err != nil {
log.Fatalf("Couldn't create cluster nodepool '%v', file:%v ,err: %v", node.Name, deployment.FileName, err)
}
}
}
return nil
}
// nodePoolCreated checks if there is any ongoing NodePool operation on the cluster
// when creating a NodePool.
func (c *GKE) nodePoolCreated(req *containerpb.CreateNodePoolRequest) (bool, error) {
rep, err := c.clientGKE.CreateNodePool(c.ctx, req)
if err != nil {
st, ok := status.FromError(err)
if !ok {
return false, fmt.Errorf("unknown reply status error %v", err)
}
if st.Code() == codes.FailedPrecondition {
// GKE cannot have two simultaneous nodepool operations running on it
// Waiting for any ongoing operation to complete before starting new one
log.Printf("Cluster in 'FailedPrecondition' state '%s'", err)
return false, nil
}
return false, err
}
log.Printf("cluster node pool status: `%v`", rep.Status)
return true, nil
}
// NodePoolDelete deletes a new k8s node-pool in an existing cluster.
func (c *GKE) NodePoolDelete(*kingpin.ParseContext) error {
// Use CreateNodePoolRequest struct to pass the UnmarshalStrict validation and
// than use the result to create the DeleteNodePoolRequest
reqC := &containerpb.CreateClusterRequest{}
for _, deployment := range c.gkeResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, reqC); err != nil {
log.Fatalf("Error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
for _, node := range reqC.Cluster.NodePools {
reqD := &containerpb.DeleteNodePoolRequest{
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
ProjectId: reqC.ProjectId,
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
Zone: reqC.Zone,
ClusterId: reqC.Cluster.Name,
NodePoolId: node.Name,
}
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
log.Printf("Removing cluster node pool: `%v`, cluster '%v', project '%v', zone '%v'", reqD.NodePoolId, reqD.ClusterId, reqD.ProjectId, reqD.Zone)
err := provider.RetryUntilTrue(
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
fmt.Sprintf("deleting nodepool:%v", reqD.NodePoolId),
provider.GlobalRetryCount,
func() (bool, error) { return c.nodePoolDeleted(reqD) })
if err != nil {
log.Fatalf("Couldn't delete cluster nodepool '%v', file:%v ,err: %v", node.Name, deployment.FileName, err)
}
}
}
return nil
}
// nodePoolDeleted checks whether a nodepool has been deleted.
func (c *GKE) nodePoolDeleted(req *containerpb.DeleteNodePoolRequest) (bool, error) {
rep, err := c.clientGKE.DeleteNodePool(c.ctx, req)
if err != nil {
st, ok := status.FromError(err)
if !ok {
return false, fmt.Errorf("unknown reply status error %v", err)
}
if st.Code() == codes.NotFound {
return true, nil
}
if st.Code() == codes.FailedPrecondition {
// GKE cannot have two simultaneous nodepool operations running on it
// Waiting for any ongoing operation to complete before starting new one
log.Printf("Cluster in 'FailedPrecondition' state '%s'", err)
return false, nil
}
return false, err
}
log.Printf("cluster node pool status: `%v`", rep.Status)
return false, nil
}
// nodePoolRunning checks whether a nodepool has been created and is running.
func (c *GKE) nodePoolRunning(zone, projectID, clusterID, poolName string) (bool, error) {
req := &containerpb.GetNodePoolRequest{
ProjectId: projectID,
Zone: zone,
ClusterId: clusterID,
NodePoolId: poolName,
}
rep, err := c.clientGKE.GetNodePool(c.ctx, req)
if err != nil {
// We don't consider none existing cluster node pool a failure. So don't return an error here.
if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
return false, nil
}
return false, fmt.Errorf("Couldn't get node pool status:%v", err)
}
if rep.Status == containerpb.NodePool_RUNNING {
return true, nil
}
if rep.Status == containerpb.NodePool_ERROR ||
rep.Status == containerpb.NodePool_RUNNING_WITH_ERROR ||
rep.Status == containerpb.NodePool_STOPPING ||
rep.Status == containerpb.NodePool_STATUS_UNSPECIFIED {
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
log.Fatalf("NodePool %s not in a status to become ready: %v", rep.Name, rep.StatusMessage)
}
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
log.Printf("Current cluster node pool '%v' status:%v , %v", rep.Name, rep.Status, rep.StatusMessage)
return false, nil
}
// AllNodepoolsRunning returns an error if at least one node pool is not running.
func (c *GKE) AllNodepoolsRunning(*kingpin.ParseContext) error {
reqC := &containerpb.CreateClusterRequest{}
for _, deployment := range c.gkeResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, reqC); err != nil {
return errors.Errorf("error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
for _, node := range reqC.Cluster.NodePools {
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
isRunning, err := c.nodePoolRunning(reqC.Zone, reqC.ProjectId, reqC.Cluster.Name, node.Name)
if err != nil {
log.Fatalf("error fetching nodePool info")
}
if !isRunning {
log.Fatalf("nodepool not running name: %v", node.Name)
}
}
}
return nil
}
// AllNodepoolsDeleted returns an error if at least one nodepool is not deleted.
func (c *GKE) AllNodepoolsDeleted(*kingpin.ParseContext) error {
reqC := &containerpb.CreateClusterRequest{}
for _, deployment := range c.gkeResources {
if err := yamlGo.UnmarshalStrict(deployment.Content, reqC); err != nil {
return errors.Errorf("error parsing the cluster deployment file %s:%v", deployment.FileName, err)
}
for _, node := range reqC.Cluster.NodePools {
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
isRunning, err := c.nodePoolRunning(reqC.Zone, reqC.ProjectId, reqC.Cluster.Name, node.Name)
if err != nil {
log.Fatalf("error fetching nodePool info")
}
if isRunning {
log.Fatalf("nodepool running name: %v", node.Name)
}
}
}
return nil
}
// NewK8sProvider sets the k8s provider used for deploying k8s manifests.
func (c *GKE) NewK8sProvider(*kingpin.ParseContext) error {
// Get the authentication certificate for the cluster using the GKE client.
req := &containerpb.GetClusterRequest{
ProjectId: c.DeploymentVars["GKE_PROJECT_ID"],
Zone: c.DeploymentVars["ZONE"],
ClusterId: c.DeploymentVars["CLUSTER_NAME"],
}
rep, err := c.clientGKE.GetCluster(c.ctx, req)
if err != nil {
log.Fatalf("failed to get cluster details: %v", err)
}
// The master auth retrieved from GCP it is base64 encoded so it must be decoded first.
caCert, err := base64.StdEncoding.DecodeString(rep.MasterAuth.GetClusterCaCertificate())
if err != nil {
log.Fatalf("failed to decode certificate: %v", err.Error())
}
cluster := clientcmdapi.NewCluster()
cluster.CertificateAuthorityData = []byte(caCert)
cluster.Server = fmt.Sprintf("https://%v", rep.Endpoint)
context := clientcmdapi.NewContext()
context.Cluster = rep.Name
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
context.AuthInfo = rep.Zone
authInfo := clientcmdapi.NewAuthInfo()
authInfo.AuthProvider = &clientcmdapi.AuthProviderConfig{
Name: "gcp",
Config: map[string]string{
"cmd-args": "config config-helper --format=json",
"expiry-key": "{.credential.token_expiry}",
"token-key": "{.credential.access_token}",
},
}
config := clientcmdapi.NewConfig()
config.Clusters[rep.Name] = cluster
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
config.Contexts[rep.Zone] = context
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
config.AuthInfos[rep.Zone] = authInfo
//nolint:staticcheck // SA1019 - Ignore "Do not use.".
config.CurrentContext = rep.Zone
c.k8sProvider, err = k8sProvider.New(c.ctx, config)
if err != nil {
log.Fatal("k8s provider error", err)
}
return nil
}
// ResourceApply calls k8s.ResourceApply to apply the k8s objects in the manifest files.
func (c *GKE) ResourceApply(*kingpin.ParseContext) error {
if err := c.k8sProvider.ResourceApply(c.k8sResources); err != nil {
log.Fatal("error while applying a resource err:", err)
}
return nil
}
// ResourceDelete calls k8s.ResourceDelete to apply the k8s objects in the manifest files.
func (c *GKE) ResourceDelete(*kingpin.ParseContext) error {
if err := c.k8sProvider.ResourceDelete(c.k8sResources); err != nil {
log.Fatal("error while deleting objects from a manifest file err:", err)
}
return nil
}
// GetDeploymentVars shows deployment variables.
func (c *GKE) GetDeploymentVars(parseContext *kingpin.ParseContext) error {
fmt.Print("-------------------\n DeploymentVars \n------------------- \n")
for key, value := range c.DeploymentVars {
fmt.Println(key, " : ", value)
}
return nil
}