hack/deployer/runner/gke.go (412 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package runner
import (
"fmt"
"log"
"strings"
"time"
"github.com/ghodss/yaml"
storagev1 "k8s.io/api/storage/v1"
"github.com/elastic/cloud-on-k8s/v3/hack/deployer/exec"
"github.com/elastic/cloud-on-k8s/v3/hack/deployer/runner/kyverno"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/vault"
)
const (
storageClassPrefix = "e2e-"
GKEDriverID = "gke"
GKEVaultPath = "ci-gcp-k8s-operator"
GKEServiceAccountVaultFieldName = "service-account"
GKEProjectVaultFieldName = "gcloud-project"
GoogleCloudProjectCtxKey = "GCloudProject"
DefaultGKERunConfigTemplate = `id: gke-dev
overrides:
clusterName: %s-dev-cluster
gke:
gCloudProject: %s
`
)
var (
// GKE uses 18 chars to prefix the pvc created by a cluster
pvcPrefixMaxLength = 18
defaultClusterIPv4CIDR = "/20"
defaultServicesIPv4CIDR = "/20"
)
func init() {
drivers[GKEDriverID] = &GKEDriverFactory{}
}
type GKEDriverFactory struct {
}
type GKEDriver struct {
plan Plan
ctx map[string]interface{}
vaultClient vault.Client
}
func (gdf *GKEDriverFactory) Create(plan Plan) (Driver, error) {
pvcPrefix := plan.ClusterName
if len(pvcPrefix) > pvcPrefixMaxLength {
pvcPrefix = pvcPrefix[0:pvcPrefixMaxLength]
}
clusterIPv4CIDR := defaultClusterIPv4CIDR
if plan.Gke.ClusterIPv4CIDR != "" {
clusterIPv4CIDR = plan.Gke.ClusterIPv4CIDR
}
servicesIPv4CIDR := defaultServicesIPv4CIDR
if plan.Gke.ServicesIPv4CIDR != "" {
servicesIPv4CIDR = plan.Gke.ServicesIPv4CIDR
}
c, err := vault.NewClient()
if err != nil {
return nil, err
}
return &GKEDriver{
plan: plan,
ctx: map[string]interface{}{
GoogleCloudProjectCtxKey: plan.Gke.GCloudProject,
"ClusterName": plan.ClusterName,
"PVCPrefix": pvcPrefix,
"PlanId": plan.Id,
"Region": plan.Gke.Region,
"KubernetesVersion": plan.KubernetesVersion,
"MachineType": plan.MachineType,
"LocalSsdCount": plan.Gke.LocalSsdCount,
"GcpScopes": plan.Gke.GcpScopes,
"NodeCountPerZone": plan.Gke.NodeCountPerZone,
"ClusterIPv4CIDR": clusterIPv4CIDR,
"ServicesIPv4CIDR": servicesIPv4CIDR,
},
vaultClient: c,
}, nil
}
func (d *GKEDriver) Execute() error {
if err := authToGCP(
d.vaultClient, GKEVaultPath, GKEServiceAccountVaultFieldName,
d.plan.ServiceAccount, false, d.ctx[GoogleCloudProjectCtxKey],
); err != nil {
return err
}
exists, err := d.clusterExists()
if err != nil {
return err
}
switch d.plan.Operation {
case DeleteAction:
if exists {
err = d.delete()
} else {
log.Printf("not deleting as cluster doesn't exist")
}
case CreateAction:
if exists {
log.Printf("not creating as cluster exists")
} else {
if err := d.create(); err != nil {
return err
}
if err := d.bindRoles(); err != nil {
return err
}
}
if d.plan.Gke.Private {
log.Printf("a private cluster has been created, please retrieve credentials manually and create storage class and provider if needed")
log.Printf("to authorize a VM to access this cluster run the following command:\n"+
"$ gcloud container clusters update %s"+
" --region %s "+
"--enable-master-authorized-networks"+
" --master-authorized-networks <VM IP>/32",
d.plan.ClusterName, d.plan.Gke.Region)
log.Printf("you can then retrieve the credentials with the following command:\n"+
"$ gcloud container clusters get-credentials %s"+
" --region %s "+
" --project %s",
d.plan.ClusterName, d.plan.Gke.Region, d.plan.Gke.GCloudProject)
return nil
}
if err := d.GetCredentials(); err != nil {
return err
}
if err := d.copyBuiltInStorageClasses(); err != nil {
return err
}
if err := setupDisks(d.plan); err != nil {
return err
}
if err := createStorageClass(); err != nil {
return err
}
if d.plan.EnforceSecurityPolicies {
if err := kyverno.Install(); err != nil {
return err
}
// apply extra policies to prevent use of unlabeled storage classes which might escape garbage collection in CI
if err := apply(kyverno.GKEPolicies); err != nil {
return err
}
}
default:
err = fmt.Errorf("unknown operation %s", d.plan.Operation)
}
return err
}
const (
GoogleComputeEngineStorageProvider = "pd.csi.storage.gke.io"
)
// copyBuiltInStorageClasses adds the "labels" parameter to copies of the built-in GCE storage classes.
// These labels are automatically applied to GCE Persistent Disks provisioned using these storage classes.
func (d *GKEDriver) copyBuiltInStorageClasses() error {
storageClassesYaml, err := exec.NewCommand("kubectl get sc -o yaml").WithoutStreaming().Output()
if err != nil {
return err
}
storageClasses := storagev1.StorageClassList{}
if err := yaml.Unmarshal([]byte(storageClassesYaml), &storageClasses); err != nil {
return err
}
existingClassNames := make(map[string]struct{})
for _, sc := range storageClasses.Items {
existingClassNames[sc.Name] = struct{}{}
}
labels, err := d.resourcesLabels()
if err != nil {
return err
}
for _, storageClass := range storageClasses.Items {
if storageClass.Provisioner != GoogleComputeEngineStorageProvider {
continue
}
// this function might be called repeatedly
if strings.HasPrefix(storageClass.Name, storageClassPrefix) {
continue
}
// This is a GCE storage class copy it
copied := copyWithPrefixAndLabels(storageClass, labels)
storageClassYaml, err := yaml.Marshal(copied)
if err != nil {
return err
}
if _, exists := existingClassNames[copied.Name]; exists {
// do not try to update existing classes
continue
}
// start by removing the default storage class marker so that our copy can take over that role
if err := exec.NewCommand(fmt.Sprintf(`kubectl annotate sc %s storageclass.kubernetes.io/is-default-class=false --overwrite=true`, storageClass.Name)).
WithoutStreaming().
Run(); err != nil {
return fmt.Errorf("while updating default storage class label: %w", err)
}
// kubectl apply the copied storage class
if err := apply(string(storageClassYaml)); err != nil {
return err
}
}
return nil
}
func apply(yaml string) error {
return exec.NewCommand(fmt.Sprintf(`cat <<EOF | kubectl apply -f -
%s
EOF`, yaml)).Run()
}
func copyWithPrefixAndLabels(sc storagev1.StorageClass, labels string) storagev1.StorageClass {
copied := sc
// create a new object
copied.UID = ""
copied.ResourceVersion = ""
// remove the addonmanager label from GKE
delete(copied.Labels, "addonmanager.kubernetes.io/mode")
// add a prefix to distinguish them from the originals
copied.Name = storageClassPrefix + sc.Name
// add the labels for cost attribution and garbage collection
if copied.Parameters == nil {
copied.Parameters = make(map[string]string)
}
copied.Parameters["labels"] = labels
return copied
}
func (d *GKEDriver) resourcesLabels() (string, error) {
username, err := d.username(true)
if err != nil {
return "", err
}
return fmt.Sprintf(
"username=%s,cluster_name=%s,plan_id=%s,region=%s",
username, d.ctx["ClusterName"], d.ctx["PlanId"], d.ctx["Region"],
), nil
}
func (d *GKEDriver) clusterExists() (bool, error) {
log.Println("Checking if cluster exists...")
cmd := "gcloud beta container clusters --project {{.GCloudProject}} describe {{.ClusterName}} --region {{.Region}}"
contains, err := exec.NewCommand(cmd).AsTemplate(d.ctx).WithoutStreaming().OutputContainsAny("Not found")
if contains {
return false, nil
}
return err == nil, err
}
func (d *GKEDriver) create() error {
log.Println("Creating cluster...")
opts := []string{}
if d.plan.Gke.NetworkPolicy {
if d.plan.Gke.Autopilot {
return fmt.Errorf("--enable-network-policy must not be set if autopilot is enabled")
}
opts = append(opts, "--enable-network-policy")
}
if d.plan.Gke.Private {
opts = append(opts, "--create-subnetwork name={{.ClusterName}}-private-subnet", "--enable-master-authorized-networks", "--enable-ip-alias", "--enable-private-nodes", "--enable-private-endpoint", "--master-ipv4-cidr", "172.16.0.32/28")
} else {
opts = append(opts, "--create-subnetwork range={{.ClusterIPv4CIDR}}", "--cluster-ipv4-cidr={{.ClusterIPv4CIDR}}", "--services-ipv4-cidr={{.ServicesIPv4CIDR}}")
}
labels, err := d.resourcesLabels()
if err != nil {
return err
}
labels = fmt.Sprintf("%s,%s", strings.Join(toList(elasticTags), ","), labels)
var createGKEClusterCommand string
if !d.plan.Gke.Autopilot {
createGKEClusterCommand = `gcloud beta container --quiet --project {{.GCloudProject}} clusters create {{.ClusterName}} ` +
`--labels "` + labels + `" --region {{.Region}} --no-enable-basic-auth --cluster-version {{.KubernetesVersion}} ` +
`--machine-type {{.MachineType}} --disk-type pd-ssd --disk-size 50 ` +
`--local-ssd-count {{.LocalSsdCount}} --scopes {{.GcpScopes}} --num-nodes {{.NodeCountPerZone}} ` +
`--addons HorizontalPodAutoscaling,HttpLoadBalancing ` +
`--no-enable-autoupgrade --no-enable-autorepair --enable-ip-alias --metadata disable-legacy-endpoints=true ` +
`--network projects/{{.GCloudProject}}/global/networks/default ` +
strings.Join(opts, " ")
} else {
// Autopilot cluster.
log.Println("autopilot cluster enabled")
createGKEClusterCommand = `gcloud beta container --quiet --project {{.GCloudProject}} clusters create-auto {{.ClusterName}} ` +
`--region {{.Region}} --cluster-version {{.KubernetesVersion}} ` +
`--scopes {{.GcpScopes}} --network projects/{{.GCloudProject}}/global/networks/default ` +
strings.Join(opts, " ")
}
err = exec.NewCommand(createGKEClusterCommand).
AsTemplate(d.ctx).
Run()
if err != nil {
return err
}
// Since gcloud doesn't support labels at creation time for autopilot clusters, update the labels after creation.
if d.plan.Gke.Autopilot {
return exec.NewCommand(`gcloud beta container --quiet --project {{.GCloudProject}} clusters update {{.ClusterName}} --region {{.Region}} --update-labels="` + labels + `"`).
AsTemplate(d.ctx).
Run()
}
return nil
}
// username attempts to extract the username from the current account.
// When used in labels the "unqualified" parameter should be set to true, it's because only lowercase letters ([a-z]),
// numeric characters ([0-9]), underscores (_) and dashes (-) are allowed as label values.
func (d *GKEDriver) username(unqualified bool) (string, error) {
user, err := exec.NewCommand(`gcloud auth list --filter=status:ACTIVE --format="value(account)"`).WithoutStreaming().Output()
if err != nil {
return "", err
}
if unqualified {
if idx := strings.Index(user, "@"); idx != -1 {
user = user[:idx]
}
user = strings.ReplaceAll(user, ".", "_")
}
return user, nil
}
func (d *GKEDriver) bindRoles() error {
user, err := d.username(false)
if err != nil {
return err
}
cmd := fmt.Sprintf("kubectl create clusterrolebinding cluster-admin-binding --clusterrole=cluster-admin --user=%s", user)
if d.plan.Gke.Private {
log.Printf("this is a private cluster, please bind roles manually from an authorized VM with the following command:\n$ %s\n", cmd)
return nil
}
log.Println("Binding roles...")
return exec.NewCommand(cmd).Run()
}
func (d *GKEDriver) GetCredentials() error {
log.Println("Verifying gcloud authentication...")
// --verbosity flag here disables warnings, and survey output.
out, err := exec.NewCommand(`gcloud auth list --filter=status:ACTIVE --format="value(account)" --verbosity error`).StdoutOnly().OutputList()
if err != nil {
return fmt.Errorf("while retrieving list of credentialed gcloud accounts: %w", err)
}
gcloudProjectInt, ok := d.ctx[GoogleCloudProjectCtxKey]
if !ok {
return fmt.Errorf("while retrieving google cloud project: missing key %s", GoogleCloudProjectCtxKey)
}
gCloudProject, ok := gcloudProjectInt.(string)
if !ok {
return fmt.Errorf("while retrieving google cloud project: key %s was not a string, was %T ", GoogleCloudProjectCtxKey, gcloudProjectInt)
}
// If there's no authenticated user, or the authenticated user doesn't exist in the configured project
// then we need to authenticate with what's within vault.
if len(out) == 0 || (len(out) > 0 && !strings.Contains(out[0], gCloudProject)) {
if err := authToGCP(
d.vaultClient, GKEVaultPath, GKEServiceAccountVaultFieldName,
d.plan.ServiceAccount, false, d.ctx[GoogleCloudProjectCtxKey],
); err != nil {
return fmt.Errorf("while authenticating to GCP: %w", err)
}
}
log.Println("Getting credentials...")
cmd := "gcloud container clusters --project {{.GCloudProject}} get-credentials {{.ClusterName}} --region {{.Region}}"
return exec.NewCommand(cmd).AsTemplate(d.ctx).Run()
}
func (d *GKEDriver) delete() error {
log.Println("Deleting cluster...")
cmd := "gcloud beta --quiet --project {{.GCloudProject}} container clusters delete {{.ClusterName}} --region {{.Region}}"
if err := exec.NewCommand(cmd).AsTemplate(d.ctx).Run(); err != nil {
return err
}
// Deleting clusters in GKE does not delete associated disks, we have to delete them manually.
cmd = `gcloud compute disks list --filter='labels.cluster_name={{.ClusterName}} AND labels.region={{.Region}} AND -users:*' --format="value[separator=','](name,zone)" --project {{.GCloudProject}}`
disks, err := exec.NewCommand(cmd).AsTemplate(d.ctx).StdoutOnly().OutputList()
if err != nil {
return err
}
if err := d.deleteDisks(disks); err != nil {
return err
}
deletedDisks := len(disks)
// This is the "legacy" way to detect orphaned disks. Keep using it while all disks do not have labels.
cmd = `gcloud compute disks list --filter="name~^gke-{{.PVCPrefix}}.*-pvc-.+" --format="value[separator=','](name,zone)" --project {{.GCloudProject}}`
disks, err = exec.NewCommand(cmd).AsTemplate(d.ctx).StdoutOnly().OutputList()
if err != nil {
return err
}
if err := d.deleteDisks(disks); err != nil {
return err
}
deletedDisks += len(disks)
if deletedDisks == 0 {
log.Println("No GCE persistent disks deleted")
} else {
log.Printf("%d GCE persistent disks deleted", deletedDisks)
}
return nil
}
func (d *GKEDriver) deleteDisks(disks []string) error {
for _, disk := range disks {
nameZone := strings.Split(disk, ",")
if len(nameZone) != 2 {
return fmt.Errorf("disk name and zone contained unexpected number of fields")
}
name, zone := nameZone[0], nameZone[1]
cmd := `gcloud compute disks delete {{.Name}} --project {{.GCloudProject}} --zone {{.Zone}} --quiet`
err := exec.NewCommand(cmd).
AsTemplate(map[string]interface{}{
GoogleCloudProjectCtxKey: d.plan.Gke.GCloudProject,
"Name": name,
"Zone": zone,
}).
Run()
if err != nil {
return err
}
}
return nil
}
func (d *GKEDriver) Cleanup(prefix string, olderThan time.Duration) error {
if d.ctx[GoogleCloudProjectCtxKey] == "" {
gCloudProject, err := vault.Get(d.vaultClient, GKEVaultPath, GKEProjectVaultFieldName)
if err != nil {
return err
}
d.ctx[GoogleCloudProjectCtxKey] = gCloudProject
}
if err := authToGCP(
d.vaultClient, GKEVaultPath, GKEServiceAccountVaultFieldName,
d.plan.ServiceAccount, false, d.ctx[GoogleCloudProjectCtxKey],
); err != nil {
return err
}
sinceDate := time.Now().Add(-olderThan)
d.ctx["Date"] = sinceDate.Format(time.RFC3339)
d.ctx["E2EClusterNamePrefix"] = prefix
cmd := `gcloud container clusters list --verbosity error --region={{.Region}} --format="value(name)" --filter="createTime<{{.Date}} AND name~{{.E2EClusterNamePrefix}}.*"`
clusters, err := exec.NewCommand(cmd).AsTemplate(d.ctx).OutputList()
if err != nil {
return err
}
for _, cluster := range clusters {
d.ctx["ClusterName"] = cluster
if err = d.delete(); err != nil {
log.Printf("while deleting cluster %s: %v", cluster, err.Error())
continue
}
}
return nil
}