hack/deployer/runner/ocp.go (406 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package runner
import (
"bytes"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"strings"
"text/template"
"time"
"github.com/elastic/cloud-on-k8s/v3/hack/deployer/exec"
"github.com/elastic/cloud-on-k8s/v3/hack/deployer/runner/env"
"github.com/elastic/cloud-on-k8s/v3/pkg/utils/vault"
)
const (
OCPDriverID = "ocp"
OCPVaultPath = "ci-ocp-k8s-operator"
OCPServiceAccountVaultFieldName = "service-account"
OCPPullSecretFieldName = "pull-secret"
OCPStateBucket = "eck-deployer-ocp-clusters-state"
DefaultOCPRunConfigTemplate = `id: ocp-dev
overrides:
clusterName: %s-dev-cluster
ocp:
gCloudProject: %s
`
OcpInstallerConfigTemplate = `apiVersion: v1
baseDomain: {{.BaseDomain}}
compute:
- hyperthreading: Enabled
name: worker
platform:
gcp:
type: {{.MachineType}}
replicas: {{.NodeCount}}
controlPlane:
hyperthreading: Enabled
name: master
platform:
gcp:
type: {{.MachineType}}
replicas: {{.NodeCount}}
metadata:
creationTimestamp: null
name: {{.ClusterName}}
networking:
clusterNetwork:
- cidr: 10.128.0.0/14
hostPrefix: 23
machineCIDR: 10.0.0.0/16
networkType: OVNKubernetes
serviceNetwork:
- 172.30.0.0/16
platform:
gcp:
projectID: {{.GCloudProject}}
region: {{.Region}}
pullSecret: '{{.PullSecret}}'`
)
func init() {
drivers[OCPDriverID] = &OCPDriverFactory{}
}
type OCPDriverFactory struct {
}
type runtimeState struct {
// Authenticated tracks authentication against the GCloud API to avoid double authentication.
Authenticated bool
// SafeToDeleteWorkdir indicates that the installer state has been uploaded successfully to the storage bucket or is
// otherwise not needed anymore.
SafeToDeleteWorkdir bool
// ClusterStateDir is the effective work dir containing the OCP installer state. Derived from plan.OCP.Workdir.
ClusterStateDir string
// ClientImage is the name of the installer client image.
ClientImage string
}
type OCPDriver struct {
plan Plan
runtimeState runtimeState
vaultClient vault.Client
}
func (*OCPDriverFactory) Create(plan Plan) (Driver, error) {
c, err := vault.NewClient()
if err != nil {
return nil, err
}
return &OCPDriver{
plan: plan,
vaultClient: c,
}, nil
}
func (d *OCPDriver) setup() []func() error {
return []func() error{
d.ensureWorkDir,
d.authToGCP,
d.ensurePullSecret,
d.downloadClusterState,
}
}
func (d *OCPDriver) Execute() error {
// client image requires a plan which we don't have in GetCredentials
setup := append(d.setup(), d.ensureClientImage)
if err := run(setup); err != nil {
return err
}
defer func() {
_ = d.removeWorkDir()
}()
clusterStatus := d.currentStatus()
switch d.plan.Operation {
case DeleteAction:
if clusterStatus != NotFound {
// always attempt a deletion
return d.delete()
}
log.Printf("Not deleting as cluster doesn't exist")
case CreateAction:
if clusterStatus == Running {
log.Printf("Not creating as cluster exists")
// rsync sometimes get stuck this makes sure we retry upload on repeated create invocations
if err := d.uploadClusterState(); err != nil {
return err
}
} else if err := d.create(); err != nil {
return err
}
return run([]func() error{
d.copyKubeconfig,
d.setupDisks,
createStorageClass,
})
default:
return fmt.Errorf("unknown operation %s", d.plan.Operation)
}
return nil
}
func (d *OCPDriver) create() error {
log.Println("Creating cluster...")
params := map[string]interface{}{
GoogleCloudProjectCtxKey: d.plan.Ocp.GCloudProject,
"ClusterName": d.plan.ClusterName,
"Region": d.plan.Ocp.Region,
"AdminUsername": d.plan.Ocp.AdminUsername,
"KubernetesVersion": d.plan.KubernetesVersion,
"MachineType": d.plan.MachineType,
"LocalSsdCount": d.plan.Ocp.LocalSsdCount,
"NodeCount": d.plan.Ocp.NodeCount,
"BaseDomain": d.baseDomain(),
"OCPStateBucket": OCPStateBucket,
"PullSecret": d.plan.Ocp.PullSecret,
}
var tpl bytes.Buffer
if err := template.Must(template.New("").Parse(OcpInstallerConfigTemplate)).Execute(&tpl, params); err != nil {
return err
}
installConfig := filepath.Join(d.runtimeState.ClusterStateDir, "install-config.yaml")
err := os.WriteFile(installConfig, tpl.Bytes(), 0600)
if err != nil {
return err
}
err = d.runInstallerCommand("create")
// We want to *always* upload the state of the cluster
// this way we can run a delete operation even on failed
// deployments to clean all the resources on GCP.
_ = d.uploadClusterState()
return err
}
func (d *OCPDriver) delete() error {
log.Printf("Deleting cluster %s ...\n", d.plan.ClusterName)
err := d.runInstallerCommand("destroy")
if err != nil {
return err
}
// No need to check whether this `rm` command succeeds
_ = exec.NewCommand("gsutil rm -r gs://{{.OCPStateBucket}}/{{.ClusterName}}").AsTemplate(d.bucketParams()).WithoutStreaming().Run()
d.runtimeState.SafeToDeleteWorkdir = true
return d.removeKubeconfig()
}
func (d *OCPDriver) GetCredentials() error {
if err := run(d.setup()); err != nil {
return err
}
defer func() {
_ = d.removeWorkDir()
}()
return d.copyKubeconfig()
}
func run(steps []func() error) error {
for _, fn := range steps {
if err := fn(); err != nil {
return err
}
}
return nil
}
func (d *OCPDriver) setupDisks() error {
return setupDisks(d.plan)
}
func (d *OCPDriver) ensureClientImage() error {
image, err := ensureClientImage(OCPDriverID, d.vaultClient, d.plan.ClientVersion, d.plan.ClientBuildDefDir)
if err != nil {
return err
}
d.runtimeState.ClientImage = image
return nil
}
func (d *OCPDriver) ensurePullSecret() error {
if d.plan.Ocp.PullSecret == "" {
s, err := vault.Get(d.vaultClient, OCPVaultPath, OCPPullSecretFieldName)
if err != nil {
return err
}
d.plan.Ocp.PullSecret = s
}
return nil
}
func (d *OCPDriver) ensureWorkDir() error {
if d.runtimeState.ClusterStateDir != "" {
// already initialised
return nil
}
workDir := d.plan.Ocp.WorkDir
if workDir == "" {
// base work dir in HOME dir otherwise mounting to container won't work without further settings adjustment
// in macOS in local mode. In CI mode we need the workdir to be in the volume shared between containers.
// having the work dir in HOME also underlines the importance of the work dir contents. The work dir is the only
// source to cleanly uninstall the cluster should the rsync fail.
var err error
workDir, err = os.MkdirTemp(os.Getenv("HOME"), d.plan.ClusterName)
if err != nil {
return err
}
log.Printf("Defaulting WorkDir: %s", workDir)
}
if err := os.MkdirAll(workDir, os.ModePerm); err != nil {
return err
}
d.runtimeState.ClusterStateDir = workDir
log.Printf("Using ClusterStateDir: %s", workDir)
return nil
}
func (d *OCPDriver) removeWorkDir() error {
if !d.runtimeState.SafeToDeleteWorkdir {
log.Printf("Not deleting work dir as rsync backup of installer state not successful")
return nil
}
// keep workdir around useful for debugging or when running in non-CI mode
if d.plan.Ocp.StickyWorkDir {
log.Printf("Not deleting work dir as requested via StickyWorkDir option")
return nil
}
return os.RemoveAll(d.plan.Ocp.WorkDir)
}
func (d *OCPDriver) authToGCP() error {
// avoid double authentication
if d.runtimeState.Authenticated {
return nil
}
if err := authToGCP(
d.vaultClient, OCPVaultPath, OCPServiceAccountVaultFieldName,
d.plan.ServiceAccount, false, d.plan.Ocp.GCloudProject,
); err != nil {
return err
}
d.runtimeState.Authenticated = true
return nil
}
type ClusterStatus string
var (
PartiallyDeployed ClusterStatus = "PartiallyDeployed"
NotFound ClusterStatus = "NotFound"
NotResponding ClusterStatus = "NotResponding"
Running ClusterStatus = "Running"
)
func (d *OCPDriver) currentStatus() ClusterStatus {
log.Println("Checking if cluster exists...")
kubeConfig := filepath.Join(d.runtimeState.ClusterStateDir, "auth", "kubeconfig")
if _, err := os.Stat(kubeConfig); os.IsNotExist(err) {
if empty, err := isEmpty(d.runtimeState.ClusterStateDir); empty && err == nil {
return NotFound
}
return PartiallyDeployed
}
log.Println("Cluster state synced: Testing that the OpenShift cluster is alive... ")
cmd := "kubectl version"
alive, err := exec.NewCommand(cmd).WithoutStreaming().WithVariable("KUBECONFIG", kubeConfig).OutputContainsAny("Server Version")
if !alive || err != nil { // error will be typically not nil when alive is false but let's be explicit here to avoid returning Running on a non-nil error
log.Printf("a cluster state dir was found in %s but the cluster is not responding to `kubectl version`: %s", d.runtimeState.ClusterStateDir, err.Error())
return NotResponding
}
return Running
}
func isEmpty(dir string) (bool, error) {
// https://stackoverflow.com/questions/30697324/how-to-check-if-directory-on-path-is-empty
f, err := os.Open(dir)
if err != nil {
return false, err
}
defer f.Close()
_, err = f.Readdirnames(1)
if errors.Is(err, io.EOF) {
return true, nil
}
return false, err
}
func (d *OCPDriver) uploadClusterState() error {
// Let's check that the cluster dir exists
// before we attempt an upload.
if _, err := os.Stat(d.runtimeState.ClusterStateDir); os.IsNotExist(err) {
log.Printf("clusterStateDir %s not present", d.runtimeState.ClusterStateDir)
return nil
}
bucketNotFound, err := exec.NewCommand("gsutil ls gs://{{.OCPStateBucket}}").
AsTemplate(d.bucketParams()).
WithoutStreaming().
OutputContainsAny("BucketNotFoundException")
if err != nil {
return fmt.Errorf("while checking state bucket existence %w", err)
}
if bucketNotFound {
if err := exec.NewCommand("gsutil mb gs://{{.OCPStateBucket}}").AsTemplate(d.bucketParams()).Run(); err != nil {
return fmt.Errorf("while creating storage bucket: %w", err)
}
}
// rsync seems to get stuck at least in local mode every now and then let's retry a few times
err = exec.NewCommand("gsutil rsync -r -d {{.ClusterStateDir}} gs://{{.OCPStateBucket}}/{{.ClusterName}}").
WithLog("Uploading cluster state").
AsTemplate(d.bucketParams()).
WithoutStreaming().
RunWithRetries(3, 15*time.Minute)
if err == nil {
d.runtimeState.SafeToDeleteWorkdir = true
}
return err
}
func (d *OCPDriver) downloadClusterState() error {
cmd := "gsutil rsync -r -d gs://{{.OCPStateBucket}}/{{.ClusterName}} {{.ClusterStateDir}}"
doesNotExist, err := exec.NewCommand(cmd).
AsTemplate(d.bucketParams()).
WithLog("Synching cluster state").
WithoutStreaming().
OutputContainsAny("BucketNotFoundException", "does not name a directory, bucket, or bucket subdir")
if doesNotExist {
log.Printf("No remote cluster state found")
return nil // swallow this error as it is expected if no cluster has been created yet
}
return err
}
func (d *OCPDriver) copyKubeconfig() error {
log.Printf("Copying credentials")
kubeConfig := filepath.Join(d.runtimeState.ClusterStateDir, "auth", "kubeconfig")
// 1. merge or create kubeconfig
if err := mergeKubeconfig(kubeConfig); err != nil {
return err
}
// 2. after merging make sure that the ocp context is in use, which is always called `admin`
return exec.NewCommand("kubectl config use-context admin").Run()
}
func (d *OCPDriver) removeKubeconfig() error {
return removeKubeconfig("admin", "admin", "admin")
}
func (d *OCPDriver) bucketParams() map[string]interface{} {
return map[string]interface{}{
"OCPStateBucket": OCPStateBucket,
"ClusterName": d.plan.ClusterName,
"ClusterStateDir": d.runtimeState.ClusterStateDir,
}
}
func (d *OCPDriver) runInstallerCommand(action string) error {
params := map[string]interface{}{
"ClusterStateDirBase": filepath.Base(d.runtimeState.ClusterStateDir),
"SharedVolume": env.SharedVolumeName(),
"GCloudCredsPath": filepath.Join("/home", GCPDir, ServiceAccountFilename),
"OCPToolsDockerImage": d.runtimeState.ClientImage,
"Action": action,
}
// We are mounting the shared volume into the installer container and configure it to be the HOME directory
// this is mainly so that the GCloud tooling picks up the authentication information correctly as the base image is
// scratch+curl and thus an empty
// We are mounting tmp as the installer needs a scratch space and writing into the container won't work
cmd := exec.NewCommand(`docker run --rm \
-v {{.SharedVolume}}:/home \
-v /tmp:/tmp \
-e GOOGLE_APPLICATION_CREDENTIALS={{.GCloudCredsPath}} \
-e HOME=/home \
{{.OCPToolsDockerImage}} \
/openshift-install {{.Action}} cluster --log-level warn --dir /home/{{.ClusterStateDirBase}}`)
return cmd.AsTemplate(params).Run()
}
func (d *OCPDriver) baseDomain() string {
baseDomain := d.plan.Ocp.BaseDomain
// Domains used for the OCP deployment must be
// pre-configured on the destination cloud. A zone
// for these domains must exist and it has to be
// reachable from the internet as `openshift-installer`
// interacts with the deployed OCP cluster to monitor
// and complete the deployment.
//
// The default `eck-ocp.elastic.dev` subdomain is configured
// on AWS as an NS record and points to a zone configured in
// the `elastic-cloud-dev` project on GCP.
if baseDomain == "" {
baseDomain = "eck-ocp.elastic.dev"
}
return baseDomain
}
func (d *OCPDriver) Cleanup(prefix string, olderThan time.Duration) error {
if err := d.authToGCP(); err != nil {
return err
}
sinceDate := time.Now().Add(-olderThan)
params := d.bucketParams()
params["Date"] = sinceDate.Format(time.RFC3339)
params["E2EClusterNamePrefix"] = prefix
params["Region"] = d.plan.Ocp.Region
if d.plan.Ocp.GCloudProject == "" {
gCloudProject, err := vault.Get(d.vaultClient, OCPVaultPath, GKEProjectVaultFieldName)
if err != nil {
return err
}
d.plan.Ocp.GCloudProject = gCloudProject
}
params["GCloudProject"] = d.plan.Ocp.GCloudProject
zonesCmd := `gcloud compute zones list --verbosity error --filter='region:https://www.googleapis.com/compute/v1/projects/{{.GCloudProject}}/regions/{{.Region}}' --format="value(selfLink.name())"`
zones, err := exec.NewCommand(zonesCmd).AsTemplate(params).WithoutStreaming().OutputList()
if err != nil {
return err
}
params["Zones"] = strings.Join(zones, ",")
cmd := `gcloud compute instances list --verbosity error --zones={{.Zones}} --filter="name~'^{{.E2EClusterNamePrefix}}-ocp.*' AND status=RUNNING" --format=json | jq -r --arg d "{{.Date}}" 'map(select(.creationTimestamp | . <= $d))|.[].name' | grep -o '{{.E2EClusterNamePrefix}}-ocp-[a-z]*-[0-9]*' | sort | uniq`
clustersToDelete, err := exec.NewCommand(cmd).AsTemplate(params).WithoutStreaming().OutputList()
if err != nil {
return err
}
for _, cluster := range clustersToDelete {
d.plan.ClusterName = cluster
d.plan.Operation = DeleteAction
if err = d.Execute(); err != nil {
log.Printf("while deleting cluster %s: %v", cluster, err.Error())
continue
}
}
return nil
}