pkg/tool/k8s.go (82 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // Package tool provides tools for operator communicating with K8S cluster package tool import ( "bytes" "fmt" "io" "os" "path/filepath" "github.com/pkg/errors" core_v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/remotecommand" ) // K8sClient is a struct which contains the kubernetes.Interface and *rest.Config. type K8sClient struct { // kubernetes.Interface should be used instead of kubernets.Inteface for unit test (mocking) ClientSet kubernetes.Interface Config *rest.Config } // NewK8sClient is to generate a K8s client for interacting with the K8S cluster. func NewK8sClient() (*K8sClient, error) { var kubeconfig string if kubeConfigPath := os.Getenv("KUBECONFIG"); kubeConfigPath != "" { kubeconfig = kubeConfigPath // CI process } else { kubeconfig = filepath.Join(os.Getenv("HOME"), ".kube", "config") // Development environment } var config *rest.Config _, err := os.Stat(kubeconfig) if err != nil { // In cluster configuration config, err = rest.InClusterConfig() if err != nil { return nil, err } } else { // Out of cluster configuration config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, err } } clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, err } var client = &K8sClient{ClientSet: clientset, Config: config} return client, nil } // Exec enables operator to execute command in the pod's container in the K8S cluster. // It returns the standard output and the standard error output. func (client *K8sClient) Exec(namespace, podName, containerName string, command []string, stdin io.Reader) (*bytes.Buffer, *bytes.Buffer, error) { clientset, config := client.ClientSet, client.Config req := clientset.CoreV1().RESTClient().Post(). Resource("pods"). Name(podName). Namespace(namespace). SubResource("exec") scheme := runtime.NewScheme() if err := core_v1.AddToScheme(scheme); err != nil { return nil, nil, fmt.Errorf("error adding to scheme: %v", err) } parameterCodec := runtime.NewParameterCodec(scheme) req.VersionedParams(&core_v1.PodExecOptions{ Command: command, Container: containerName, Stdin: stdin != nil, Stdout: true, Stderr: true, TTY: false, }, parameterCodec) exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) if err != nil { return nil, nil, errors.Wrap(err, "Could not create SPDY executor") } var stdout, stderr bytes.Buffer err = exec.Stream(remotecommand.StreamOptions{ Stdin: stdin, Stdout: &stdout, Stderr: &stderr, Tty: false, }) if err != nil { return nil, nil, errors.Wrap(err, "Error while exec'ing stream") } return &stdout, &stderr, nil }