internal/kubectl.go (404 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package internal
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/elastic/eck-diagnostics/internal/archive"
internal_filters "github.com/elastic/eck-diagnostics/internal/filters"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/version"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/cli-runtime/pkg/printers"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/kubectl/pkg/cmd/exec"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"k8s.io/kubectl/pkg/describe"
"k8s.io/kubectl/pkg/polymorphichelpers"
"k8s.io/kubectl/pkg/util/openapi"
)
// Kubectl provides utilities based on the kubectl API.
type Kubectl struct {
defaultNamespace string
config *rest.Config
*kubernetes.Clientset
factory cmdutil.Factory
openAPISchema openapi.Resources
out io.Writer
errOut io.Writer
verbose bool
}
// NewKubectl creates a new instance of Kubectl.
func NewKubectl(kubeConfig string, verbose bool) (*Kubectl, error) {
confFlags := genericclioptions.NewConfigFlags(true)
if kubeConfig != "" {
confFlags.KubeConfig = &kubeConfig
}
matchVersionFlags := cmdutil.NewMatchVersionFlags(confFlags)
factory := cmdutil.NewFactory(matchVersionFlags)
openAPISchema, err := factory.OpenAPISchema()
if err != nil {
return nil, fmt.Errorf("failed to retrieve OpenAPI schema: %w", err)
}
config, err := factory.ToRESTConfig()
if err != nil {
return nil, err
}
clientSet, err := factory.KubernetesClientSet()
if err != nil {
return nil, err
}
return &Kubectl{
Clientset: clientSet,
defaultNamespace: "default",
config: config,
factory: factory,
openAPISchema: openAPISchema,
out: os.Stdout,
errOut: os.Stderr,
verbose: verbose,
}, nil
}
// CheckNamespaces verifies that all namespaces in nss exist.
func (c Kubectl) CheckNamespaces(ctx context.Context, nss []string) error {
clientSet, err := c.factory.KubernetesClientSet()
if err != nil {
return err
}
for _, ns := range nss {
if _, err := clientSet.CoreV1().Namespaces().Get(ctx, ns, metav1.GetOptions{}); err != nil {
return err
}
}
return nil
}
func (c Kubectl) Copy(nsn types.NamespacedName, container string, path string, recordErr func(error)) (*io.PipeReader, error) {
execErrOut := io.Discard
if c.verbose {
execErrOut = c.errOut
}
reader, outStream := io.Pipe()
options := &exec.ExecOptions{
StreamOptions: exec.StreamOptions{
IOStreams: genericclioptions.IOStreams{
In: nil,
Out: outStream,
ErrOut: execErrOut,
},
Namespace: nsn.Namespace,
PodName: nsn.Name,
ContainerName: container,
},
Config: c.config,
PodClient: c.CoreV1(),
Command: []string{"tar", "cf", "-", path},
Executor: &exec.DefaultRemoteExecutor{},
}
go func() {
defer func() {
// TODO: this routine never terminates in my experiments and this code never runs
// we are effectively leaking go routines for every diagnostic we run
outStream.Close()
}()
err := options.Run()
if err != nil {
recordErr(err)
return
}
}()
return reader, nil
}
func (c Kubectl) Exec(nsn types.NamespacedName, containerName string, cmd ...string) error {
execErrOut := io.Discard
if c.verbose {
execErrOut = c.errOut
}
options := &exec.ExecOptions{
StreamOptions: exec.StreamOptions{
IOStreams: genericclioptions.IOStreams{
In: nil,
Out: nil,
ErrOut: execErrOut,
},
Namespace: nsn.Namespace,
PodName: nsn.Name,
ContainerName: containerName,
},
Config: c.config,
PodClient: c.CoreV1(),
Command: cmd,
Executor: &exec.DefaultRemoteExecutor{},
}
return options.Run()
}
// GetByLabel retrieves the K8s objects of type resource in namespace and marshals them into the writer w.
// If filters is not empty, this will only return resources within the cluster that its labels match
// at least one of the filter's label selectors.
func (c Kubectl) GetByLabel(resource, namespace string, filters internal_filters.Filters, w io.Writer) error {
return c.getFiltered(resource, namespace, w,
func(object metav1.Object) bool {
return filters.Matches(object.GetLabels())
},
filters.Empty())
}
// GetByName retrieves the K8s objects of type resource in namespace and marshals them into the writer w.
// If filters is not empty, this will only return resources within the cluster that its name matches
// at least one of the filter's type+name pair.
func (c Kubectl) GetByName(resource, namespace string, filters internal_filters.Filters, w io.Writer) error {
return c.getFiltered(resource, namespace, w,
func(object metav1.Object) bool {
return filters.Contains(object.GetName(), resource)
},
filters.Empty())
}
func (c Kubectl) getFiltered(resource, namespace string, w io.Writer, filter func(object metav1.Object) bool, skipFilter bool) error {
r, err := c.getResources(resource, namespace)
if err != nil {
return err
}
printer, err := printers.NewTypeSetter(scheme.Scheme).WrapToPrinter(&printers.JSONPrinter{}, nil)
if err != nil {
return err
}
obj, err := r.Object()
if err != nil {
return err
}
// If there are no filters, simply return the unfiltered resources.
if skipFilter {
return printer.PrintObj(obj, w)
}
// Otherwise, convert the returned resource to a List, and filter for any matching objects
// using the provided filter func.
var list *corev1.List
list, ok := obj.(*corev1.List)
if !ok {
return fmt.Errorf("while converting returned object (%T) to list", obj)
}
filtered := corev1.List{}
for _, item := range list.Items {
obj, err := meta.Accessor(item.Object)
if err != nil {
return fmt.Errorf("while accessing metadata for %s: %w", item.Object.GetObjectKind().GroupVersionKind().String(), err)
}
if filter(obj) {
filtered.Items = append(filtered.Items, item)
}
}
return printer.PrintObj(&filtered, w)
}
// getResources retrieves the K8s objects of type resource and returns a resource.Result.
func (c Kubectl) getResources(resource string, namespace string) (*resource.Result, error) {
r := c.factory.NewBuilder().
Unstructured().
NamespaceParam(namespace).DefaultNamespace().AllNamespaces(false).
ResourceTypeOrNameArgs(true, resource).
ContinueOnError().
Latest().
Flatten().
Do()
r.IgnoreErrors(apierrors.IsNotFound)
if err := r.Err(); err != nil {
return nil, err
}
return r, nil
}
// getResourcesMatching retrieves the K8s objects of type resource matching label selector and returns a resource.Result.
func (c Kubectl) getResourcesMatching(resource string, namespace string, selector string) (*resource.Result, error) {
r := c.factory.NewBuilder().
Unstructured().
NamespaceParam(namespace).DefaultNamespace().AllNamespaces(false).
ResourceTypeOrNameArgs(true, resource).
LabelSelector(selector).
ContinueOnError().
Latest().
Flatten().
Do()
r.IgnoreErrors(apierrors.IsNotFound)
if err := r.Err(); err != nil {
return nil, err
}
return r, nil
}
// GetMeta retrieves the metadata for the K8s objects of type resource and marshals them into writer w.
// It tries to elide sensitive data like secret contents or kubectl last-applied configuration annotations.
func (c Kubectl) GetMeta(resource, namespace string, w io.Writer) error {
r := c.factory.NewBuilder().
Unstructured().
NamespaceParam(namespace).DefaultNamespace().AllNamespaces(false).
ResourceTypeOrNameArgs(true, resource).
ContinueOnError().
Latest().
Flatten().
Do()
r.IgnoreErrors(apierrors.IsNotFound)
if err := r.Err(); err != nil {
return err
}
type MetaList struct {
Items []interface{}
}
var metas MetaList
metaAccess := meta.NewAccessor()
infos, err := r.Infos()
if err != nil {
return err
}
for i := range infos {
obj := infos[i].Object
annotations, err := metaAccess.Annotations(obj)
if err != nil {
return err
}
// last-applied-configuration can contain sensitive data let's remove it
delete(annotations, corev1.LastAppliedConfigAnnotation)
if err := metaAccess.SetAnnotations(obj, annotations); err != nil {
return err
}
unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return err
}
// remove the actual secret data
delete(unstructured, "data")
// or spec for other objects
delete(unstructured, "spec")
metas.Items = append(metas.Items, unstructured)
}
bytes, err := json.MarshalIndent(metas, "", " ")
if err != nil {
return err
}
_, err = w.Write(bytes)
return err
}
// Describe mimics "kubectl describe" and writes the result to writer w.
func (c Kubectl) Describe(resource, prefix, namespace string, w io.Writer) error {
r := c.factory.NewBuilder().
Unstructured().
NamespaceParam(namespace).DefaultNamespace().AllNamespaces(false).
ResourceTypeOrNameArgs(true, resource).
ContinueOnError().
Latest().
Flatten().
Do()
if err := r.Err(); err != nil {
return err
}
infos, err := r.Infos()
if err != nil {
return err
}
for _, info := range infos {
if !strings.HasPrefix(info.Name, prefix) {
continue
}
mapping := info.ResourceMapping()
desc, err := describe.Describer(c.factory, mapping)
if err != nil {
return err
}
s, err := desc.Describe(info.Namespace, info.Name, describe.DescriberSettings{ShowEvents: true})
if err != nil {
return err
}
fmt.Fprintf(w, "%s\n", s)
}
return nil
}
// Logs mimics "kubectl logs -l selector" and writes the result to writers produced by out when given a filename.
func (c Kubectl) Logs(namespace string, selector string, filters internal_filters.Filters, out func(string) (io.Writer, error)) error {
builder := c.factory.NewBuilder().
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
NamespaceParam(namespace).
SingleResourceType().ResourceTypes("pods")
if selector != "" {
builder.LabelSelector(selector)
} else {
builder.SelectAllParam(true)
}
infos, err := builder.Do().Infos()
if err != nil {
return err
}
for i := range infos {
obj := infos[i].Object
switch t := obj.(type) {
case *corev1.PodList:
for _, p := range t.Items {
// ignore pod when filters are being applied, and the pod doesn't match the filters.
if !filters.Matches(p.Labels) {
continue
}
if err := c.requestLogs(p, out); err != nil {
return err
}
}
case *corev1.Pod:
// ignore pod when filters are being applied, and the pod doesn't match the filters.
if !filters.Matches(t.Labels) {
continue
}
if err := c.requestLogs(*t, out); err != nil {
return err
}
}
}
return nil
}
// requestLogs requests the logs for pod and writes the result to writers produced by out when given a filename.
func (c Kubectl) requestLogs(pod corev1.Pod, out func(string) (io.Writer, error)) error {
// if Pod not in running state let's not extract logs, trying to get logs from previous container does not seem to work
// reliably and might lead to extra misleading noise in the diagnostic data
if pod.Status.Phase != corev1.PodRunning {
return nil
}
logFn := polymorphichelpers.LogsForObjectFn
reqs, err := logFn(c.factory, &pod, &corev1.PodLogOptions{}, 20*time.Second, true)
if err != nil {
return err
}
writer, err := out(archive.Path(pod.Namespace, "pod", pod.Name, "logs.txt"))
if err != nil {
return err
}
for _, r := range reqs {
if err := streamLogs(types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, r, writer); err != nil {
return err
}
}
return nil
}
// streamLogs when given a ResponseWrapper streams the result to out adding textual start and end markers.
func streamLogs(nsn types.NamespacedName, request rest.ResponseWrapper, out io.Writer) error {
stream, err := request.Stream(context.Background())
if err != nil {
return err
}
defer stream.Close()
_, _ = out.Write([]byte(fmt.Sprintf("==== START logs for %s ====\n", nsn.String())))
defer func() {
_, _ = out.Write([]byte(fmt.Sprintf("==== END logs for %s ====\n", nsn.String())))
}()
r := bufio.NewReader(stream)
for {
bytes, err := r.ReadBytes('\n')
if _, err := out.Write(bytes); err != nil {
return err
}
if err != nil {
if err != io.EOF {
return err
}
return nil
}
}
}
// versionInfo exists to marshal both eck-diagnostics version information and K8s server version information.
type versionInfo struct {
DiagnosticsVersion DiagnosticsVersion
ServerVersion *version.Info
}
// Version is inspired by "kubectl version" but includes version information about this tool in addition to K8s
// server version information.
func (c Kubectl) Version(out io.Writer) error {
v := versionInfo{
DiagnosticsVersion: about(),
}
client, err := c.factory.ToDiscoveryClient()
if err != nil {
return err
}
// mirroring kubectl behaviour to fetch fresh data from server
client.Invalidate()
serverVersion, err := client.ServerVersion()
if err != nil {
return err
}
v.ServerVersion = serverVersion
bytes, err := json.MarshalIndent(v, "", " ")
if err != nil {
return err
}
_, err = out.Write(bytes)
return err
}