dev-tools/mage/kubernetes/kuberemote.go (531 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package kubernetes import ( "bufio" "context" "crypto/rand" "crypto/rsa" "crypto/x509" "encoding/hex" "encoding/pem" "fmt" "io" "net" "net/http" "net/url" "os" "os/exec" "strings" "time" "golang.org/x/crypto/ssh" apiv1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/portforward" watchtools "k8s.io/client-go/tools/watch" "k8s.io/client-go/transport/spdy" "github.com/elastic/elastic-agent/dev-tools/mage" ) const sshBitSize = 4096 var mode = int32(256) // KubeRemote rsyncs the passed directory to a pod and runs the command inside of that pod. type KubeRemote struct { cfg *rest.Config cs *kubernetes.Clientset namespace string name string workDir string destDir string syncDir string svcAccName string secretName string privateKey []byte publicKey []byte } // NewKubeRemote creates a new kubernetes remote runner. func NewKubeRemote(kubeconfig string, namespace string, name string, workDir string, destDir string, syncDir string) (*KubeRemote, error) { config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, err } cs, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } name = strings.Replace(name, "_", "-", -1) svcAccName := fmt.Sprintf("%s-sa", name) secretName := fmt.Sprintf("%s-ssh-key", name) privateKey, publicKey, err := generateSSHKeyPair() if err != nil { return nil, err } return &KubeRemote{config, cs, namespace, name, workDir, destDir, syncDir, svcAccName, secretName, privateKey, publicKey}, nil } // Run runs the command remotely on the kubernetes cluster. func (r *KubeRemote) Run(env map[string]string, stdout io.Writer, stderr io.Writer, args ...string) error { if err := r.syncSSHKey(); err != nil { return fmt.Errorf("failed to sync SSH secret: %w", err) } defer r.deleteSSHKey() if err := r.syncServiceAccount(); err != nil { return err } defer r.deleteServiceAccount() _, err := r.createPod(env, args...) if err != nil { return fmt.Errorf("failed to create execute pod: %w", err) } defer r.deletePod() // wait for SSH to be up inside the init container. _, err = r.waitForPod(5*time.Minute, podInitReady) if err != nil { return fmt.Errorf("execute pod init container never started: %w", err) } time.Sleep(1 * time.Second) // SSH inside of container can take a moment // forward the SSH port so rsync can be ran. randomPort, err := getFreePort() if err != nil { return fmt.Errorf("failed to find a free port: %w", err) } stopChannel := make(chan struct{}, 1) readyChannel := make(chan struct{}, 1) f, err := r.portForward([]string{fmt.Sprintf("%d:%d", randomPort, 22)}, stopChannel, readyChannel, stderr, stderr) if err != nil { return err } //nolint:errcheck // ignore error go f.ForwardPorts() <-readyChannel // perform the rsync if err := r.rsync(randomPort, stderr, stderr); err != nil { return err } // stop port forwarding close(stopChannel) // wait for exec container to be running _, err = r.waitForPod(5*time.Minute, containerRunning("exec")) if err != nil { return fmt.Errorf("execute pod container never started: %w", err) } // stream the logs of the container err = r.streamLogs("exec", stdout) if err != nil { return fmt.Errorf("failed to stream the logs: %w", err) } // wait for exec container to be completely done pod, err := r.waitForPod(30*time.Second, podDone) if err != nil { return fmt.Errorf("execute pod didn't terminate after 30 seconds of log stream: %w", err) } // return error on failure if pod.Status.Phase == apiv1.PodFailed { return fmt.Errorf("execute pod test failed") } return nil } // deleteSSHKey deletes SSH key from the cluster. func (r *KubeRemote) deleteSSHKey() { _ = r.cs.CoreV1().Secrets(r.namespace).Delete(context.TODO(), r.secretName, metav1.DeleteOptions{}) } // syncSSHKey syncs the SSH key to the cluster. func (r *KubeRemote) syncSSHKey() error { // delete before create r.deleteSSHKey() _, err := r.cs.CoreV1().Secrets(r.namespace).Create( context.TODO(), createSecretManifest(r.secretName, r.publicKey), metav1.CreateOptions{}) if err != nil { return err } return nil } // deleteServiceAccount syncs required service account. func (r *KubeRemote) deleteServiceAccount() { ctx := context.TODO() _ = r.cs.RbacV1().ClusterRoleBindings().Delete(ctx, r.name, metav1.DeleteOptions{}) _ = r.cs.RbacV1().ClusterRoles().Delete(ctx, r.name, metav1.DeleteOptions{}) _ = r.cs.CoreV1().ServiceAccounts(r.namespace).Delete(ctx, r.svcAccName, metav1.DeleteOptions{}) } // syncServiceAccount syncs required service account. func (r *KubeRemote) syncServiceAccount() error { ctx := context.TODO() // delete before create r.deleteServiceAccount() _, err := r.cs.CoreV1().ServiceAccounts(r.namespace).Create( ctx, createServiceAccountManifest(r.svcAccName), metav1.CreateOptions{}) if err != nil { return fmt.Errorf("failed to create service account: %w", err) } _, err = r.cs.RbacV1().ClusterRoles().Create(ctx, createClusterRoleManifest(r.name), metav1.CreateOptions{}) if err != nil { return fmt.Errorf("failed to create cluster role: %w", err) } _, err = r.cs.RbacV1().ClusterRoleBindings().Create( ctx, createClusterRoleBindingManifest(r.name, r.namespace, r.svcAccName), metav1.CreateOptions{}) if err != nil { return fmt.Errorf("failed to create cluster role binding: %w", err) } return nil } // createPod creates the pod. func (r *KubeRemote) createPod(env map[string]string, cmd ...string) (*apiv1.Pod, error) { version, err := mage.GoVersion() if err != nil { return nil, err } image := fmt.Sprintf("golang:%s", version) r.deletePod() // ensure it doesn't already exist return r.cs.CoreV1().Pods(r.namespace).Create( context.TODO(), createPodManifest(r.name, image, env, cmd, r.workDir, r.destDir, r.secretName, r.svcAccName), metav1.CreateOptions{}) } // deletePod deletes the pod. func (r *KubeRemote) deletePod() { _ = r.cs.CoreV1().Pods(r.namespace).Delete(context.TODO(), r.name, metav1.DeleteOptions{}) } // waitForPod waits for the created pod to match the given condition. func (r *KubeRemote) waitForPod(wait time.Duration, condition watchtools.ConditionFunc) (*apiv1.Pod, error) { w, err := r.cs.CoreV1().Pods(r.namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: r.name})) if err != nil { return nil, err } ctx, _ := watchtools.ContextWithOptionalTimeout(context.Background(), wait) ev, err := watchtools.UntilWithoutRetry(ctx, w, func(ev watch.Event) (bool, error) { return condition(ev) }) if ev != nil { return ev.Object.(*apiv1.Pod), err } return nil, err } // portFoward runs the port forwarding so SSH rsync can be ran into the pod. func (r *KubeRemote) portForward(ports []string, stopChannel, readyChannel chan struct{}, stdout, stderr io.Writer) (*portforward.PortForwarder, error) { roundTripper, upgrader, err := spdy.RoundTripperFor(r.cfg) if err != nil { return nil, err } path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", r.namespace, r.name) hostIP := strings.TrimPrefix(r.cfg.Host, "https://") serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP} dialer := spdy.NewDialer(upgrader, &http.Client{Transport: roundTripper}, http.MethodPost, &serverURL) return portforward.New(dialer, ports, stopChannel, readyChannel, stdout, stderr) } // rsync performs the rsync of sync directory to destination directory inside of the pod. func (r *KubeRemote) rsync(port uint16, stdout, stderr io.Writer) error { privateKeyFile, err := createTempFile(r.privateKey) if err != nil { return err } rsh := fmt.Sprintf("ssh -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o LogLevel=ERROR -p %d -i %s", port, privateKeyFile) args := []string{ "--rsh", rsh, "-a", fmt.Sprintf("%s/", r.syncDir), fmt.Sprintf("root@localhost:%s", r.destDir), } cmd := exec.Command("rsync", args...) cmd.Stdout = stdout cmd.Stderr = stderr return cmd.Run() } // streamLogs streams the logs from the execution pod until the pod is terminated. func (r *KubeRemote) streamLogs(container string, stdout io.Writer) error { req := r.cs.CoreV1().Pods(r.namespace).GetLogs(r.name, &apiv1.PodLogOptions{ Container: container, Follow: true, }) logs, err := req.Stream(context.TODO()) if err != nil { return err } defer logs.Close() reader := bufio.NewReader(logs) for { bytes, err := reader.ReadBytes('\n') if _, err := stdout.Write(bytes); err != nil { return err } if err != nil { if err != io.EOF { return err } return nil } } } // generateSSHKeyPair generates a new SSH key pair. func generateSSHKeyPair() ([]byte, []byte, error) { private, err := rsa.GenerateKey(rand.Reader, sshBitSize) if err != nil { return nil, nil, err } if err = private.Validate(); err != nil { return nil, nil, err } public, err := ssh.NewPublicKey(&private.PublicKey) if err != nil { return nil, nil, err } return encodePrivateKeyToPEM(private), ssh.MarshalAuthorizedKey(public), nil } // encodePrivateKeyToPEM encodes private key from RSA to PEM format. func encodePrivateKeyToPEM(privateKey *rsa.PrivateKey) []byte { privDER := x509.MarshalPKCS1PrivateKey(privateKey) privBlock := pem.Block{ Type: "RSA PRIVATE KEY", Headers: nil, Bytes: privDER, } return pem.EncodeToMemory(&privBlock) } // getFreePort finds a free port. func getFreePort() (uint16, error) { addr, err := net.ResolveTCPAddr("tcp", "localhost:0") if err != nil { return 0, err } l, err := net.ListenTCP("tcp", addr) if err != nil { return 0, err } defer l.Close() return uint16(l.Addr().(*net.TCPAddr).Port), nil } // createSecretManifest creates the secret object to create in the cluster. // // This is the public key that the sshd uses as the authorized key. func createSecretManifest(name string, publicKey []byte) *apiv1.Secret { return &apiv1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, StringData: map[string]string{ "authorized_keys": string(publicKey), }, } } // createServiceAccountManifest creates the service account the pod will used. func createServiceAccountManifest(name string) *apiv1.ServiceAccount { return &apiv1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, } } // createClusterRoleManifest creates the cluster role the pod will used. // // This gives the pod all permissions on everything! func createClusterRoleManifest(name string) *rbacv1.ClusterRole { return &rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, Rules: []rbacv1.PolicyRule{ rbacv1.PolicyRule{ Verbs: []string{"*"}, APIGroups: []string{"*"}, Resources: []string{"*"}, }, rbacv1.PolicyRule{ Verbs: []string{"*"}, NonResourceURLs: []string{"*"}, }, }, } } // createClusterRoleBindingManifest creates the cluster role binding the pod will used. // // This binds the service account to the cluster role. func createClusterRoleBindingManifest(name string, namespace string, svcAccName string) *rbacv1.ClusterRoleBinding { return &rbacv1.ClusterRoleBinding{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, Subjects: []rbacv1.Subject{ rbacv1.Subject{ Kind: "ServiceAccount", Name: svcAccName, Namespace: namespace, }, }, RoleRef: rbacv1.RoleRef{ APIGroup: "rbac.authorization.k8s.io", Kind: "ClusterRole", Name: name, }, } } // createPodManifest creates the pod inside of the cluster that will be used for remote execution. // // Creates a pod with an init container that runs sshd-rsync, once the first connection closes the init container // exits then the exec container starts using the rsync'd directory as its work directory. func createPodManifest(name string, image string, env map[string]string, cmd []string, workDir string, destDir string, secretName string, svcAccName string) *apiv1.Pod { execEnv := []apiv1.EnvVar{ apiv1.EnvVar{ Name: "NODE_NAME", ValueFrom: &apiv1.EnvVarSource{ FieldRef: &apiv1.ObjectFieldSelector{ FieldPath: "spec.nodeName", }, }, }, } for k, v := range env { execEnv = append(execEnv, apiv1.EnvVar{ Name: k, Value: v, }) } return &apiv1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, Spec: apiv1.PodSpec{ ServiceAccountName: svcAccName, HostNetwork: true, DNSPolicy: apiv1.DNSClusterFirstWithHostNet, RestartPolicy: apiv1.RestartPolicyNever, InitContainers: []apiv1.Container{ { Name: "sync-init", Image: "ernoaapa/sshd-rsync", Ports: []apiv1.ContainerPort{ { Name: "ssh", Protocol: apiv1.ProtocolTCP, ContainerPort: 22, }, }, Env: []apiv1.EnvVar{ { Name: "ONE_TIME", Value: "true", }, }, VolumeMounts: []apiv1.VolumeMount{ { Name: "ssh-config", MountPath: "/root/.ssh/authorized_keys", SubPath: "authorized_keys", }, { Name: "destdir", MountPath: destDir, }, }, }, }, Containers: []apiv1.Container{ { Name: "exec", Image: image, Command: cmd, WorkingDir: workDir, Env: execEnv, VolumeMounts: []apiv1.VolumeMount{ { Name: "destdir", MountPath: destDir, }, }, }, }, Volumes: []apiv1.Volume{ { Name: "ssh-config", VolumeSource: apiv1.VolumeSource{ Secret: &apiv1.SecretVolumeSource{ SecretName: secretName, DefaultMode: &mode, }, }, }, { Name: "destdir", VolumeSource: apiv1.VolumeSource{ EmptyDir: &apiv1.EmptyDirVolumeSource{}, }, }, }, }, } } func podInitReady(event watch.Event) (bool, error) { switch event.Type { case watch.Deleted: return false, k8serrors.NewNotFound(schema.GroupResource{Resource: "pods"}, "") } switch t := event.Object.(type) { case *apiv1.Pod: switch t.Status.Phase { case apiv1.PodFailed, apiv1.PodSucceeded: return false, nil case apiv1.PodRunning: return false, nil case apiv1.PodPending: return isInitContainersReady(t), nil } } return false, nil } func isInitContainersReady(pod *apiv1.Pod) bool { if isScheduled(pod) && isInitContainersRunning(pod) { return true } return false } func isScheduled(pod *apiv1.Pod) bool { if len(pod.Status.Conditions) > 0 { for _, condition := range pod.Status.Conditions { if condition.Type == apiv1.PodScheduled && condition.Status == apiv1.ConditionTrue { return true } } } return false } func isInitContainersRunning(pod *apiv1.Pod) bool { if len(pod.Spec.InitContainers) != len(pod.Status.InitContainerStatuses) { return false } for _, status := range pod.Status.InitContainerStatuses { if status.State.Running == nil { return false } } return true } func containerRunning(containerName string) func(watch.Event) (bool, error) { return func(event watch.Event) (bool, error) { switch event.Type { case watch.Deleted: return false, k8serrors.NewNotFound(schema.GroupResource{Resource: "pods"}, "") } switch t := event.Object.(type) { case *apiv1.Pod: switch t.Status.Phase { case apiv1.PodFailed, apiv1.PodSucceeded: return false, nil case apiv1.PodRunning: return isContainerRunning(t, containerName) } } return false, nil } } func isContainerRunning(pod *apiv1.Pod, containerName string) (bool, error) { for _, status := range pod.Status.ContainerStatuses { if status.Name == containerName { if status.State.Waiting != nil { return false, nil } else if status.State.Running != nil { return true, nil } else if status.State.Terminated != nil { return false, nil } else { return false, fmt.Errorf("unknown container state") } } } return false, nil } func podDone(event watch.Event) (bool, error) { switch event.Type { case watch.Deleted: return false, k8serrors.NewNotFound(schema.GroupResource{Resource: "pods"}, "") } switch t := event.Object.(type) { case *apiv1.Pod: switch t.Status.Phase { case apiv1.PodFailed, apiv1.PodSucceeded: return true, nil } } return false, nil } func createTempFile(content []byte) (string, error) { randBytes := make([]byte, 16) _, err := rand.Read(randBytes) if err != nil { return "", err } tmpfile, err := os.CreateTemp("", hex.EncodeToString(randBytes)) if err != nil { return "", err } defer tmpfile.Close() if _, err := tmpfile.Write(content); err != nil { return "", err } return tmpfile.Name(), nil }