internal/diag.go (249 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package internal import ( "bytes" "context" "fmt" "io" "os" "os/signal" "path/filepath" "strings" "time" "k8s.io/apimachinery/pkg/util/sets" "github.com/elastic/eck-diagnostics/internal/archive" "github.com/elastic/eck-diagnostics/internal/filters" "github.com/elastic/eck-diagnostics/internal/log" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/version" _ "k8s.io/client-go/plugin/pkg/client/auth/azure" // auth on azure (deprecated) _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // auth on gke (deprecated) _ "k8s.io/client-go/plugin/pkg/client/auth/oidc" // auth flow for OpenID Connect (deprecated) ) var logger = log.Logger // Params is a collection of parameters controlling the extraction of diagnostic data. // See the main command for explanation of individual parameters. type Params struct { DiagnosticImage string ECKVersion string Kubeconfig string OperatorNamespaces []string ResourcesNamespaces []string OutputDir string OutputName string RunStackDiagnostics bool RunAgentDiagnostics bool Verbose bool StackDiagnosticsTimeout time.Duration Filters filters.Filters LogFilters filters.Filters } // AllNamespaces returns a slice containing all namespaces from which we want to extract diagnostic data. func (dp Params) AllNamespaces() []string { nss := make([]string, 0, len(dp.ResourcesNamespaces)+len(dp.OperatorNamespaces)) nss = append(nss, dp.ResourcesNamespaces...) nss = append(nss, dp.OperatorNamespaces...) return nss } // Run extracts diagnostic information based on the given params. // It produces a zip file with the contents as a side effect. func Run(params Params) error { logger.Printf("ECK diagnostics with parameters: %+v", params) stopCh := make(chan struct{}) sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, os.Interrupt) go func() { s := <-sigCh logger.Printf("Aborting: %v received", s) close(stopCh) }() kubectl, err := NewKubectl(params.Kubeconfig, params.Verbose) if err != nil { return err } if err := kubectl.CheckNamespaces(context.Background(), params.AllNamespaces()); err != nil { return err } clientSet, err := kubectl.factory.KubernetesClientSet() if err != nil { return err } zipFileName := filepath.Join(params.OutputDir, params.OutputName) zipFile, err := archive.NewZipFile(zipFileName, about().Version, logger) if err != nil { return err } // Filters is intentionally empty in many of these, as Elastic labels // are not applied to these resources. zipFile.Add(map[string]func(io.Writer) error{ "version.json": func(writer io.Writer) error { return kubectl.Version(writer) }, "nodes.json": func(writer io.Writer) error { return kubectl.GetByLabel("nodes", "", filters.Empty, writer) }, "podsecuritypolicies.json": func(writer io.Writer) error { return kubectl.GetByLabel("podsecuritypolicies", "", filters.Empty, writer) }, "storageclasses.json": func(writer io.Writer) error { return kubectl.GetByLabel("storageclasses", "", filters.Empty, writer) }, "clusterroles.txt": func(writer io.Writer) error { return kubectl.Describe("clusterroles", "elastic", "", writer) }, "clusterrolebindings.txt": func(writer io.Writer) error { return kubectl.Describe("clusterrolebindings", "elastic", "", writer) }, }) var operatorLabels []labels.Set operatorVersions := make([]*version.Version, 0, len(params.OperatorNamespaces)) for _, ns := range params.OperatorNamespaces { // Find the label in use by operator in this namespace and add this to // the set of filters to ensure we always retrieve the objects // associated with the ECK operator. operatorLabel, err := getOperatorLabel(kubectl, ns) if err != nil || operatorLabel == nil { logger.Printf("Could not find label corresponding to ECK Operator in namespace %s: potentially not including operator data in diagnostics", ns) } else { operatorLabels = append(operatorLabels, operatorLabel) } operatorVersions = append(operatorVersions, detectECKVersion(clientSet, ns, params.ECKVersion)) } maxOperatorVersion := maxVersion(operatorVersions) logVersion(maxOperatorVersion) allNamespaces := sets.New(params.ResourcesNamespaces...) allNamespaces.Insert(params.OperatorNamespaces...) logsLabels := []string{ "common.k8s.elastic.co/type=elasticsearch", "common.k8s.elastic.co/type=kibana", "common.k8s.elastic.co/type=apm-server", // the below were introduced in later version but label selector will just return no result: "common.k8s.elastic.co/type=enterprise-search", // 1.2.0 "common.k8s.elastic.co/type=beat", // 1.2.0 "common.k8s.elastic.co/type=agent", // 1.4.0 "common.k8s.elastic.co/type=maps", // 1.6.0 "common.k8s.elastic.co/type=logstash", // 2.8.0 } operatorSelectors := make([]labels.Selector, len(operatorLabels)) for i, label := range operatorLabels { operatorSelectors[i] = label.AsSelector() logsLabels = append(logsLabels, label.AsSelector().String()) } // always collect operator information even in the presence of filters operatorFilters := filters.LabelFilter(operatorSelectors) // user defined type filters or any resource related to the operator namespaceFilters := filters.Or(params.Filters, operatorFilters) // for logs respect user defined log filters but add the operator filters here too so that the AND works out and we always have the operator logs logsFilters := filters.And(filters.Or(params.LogFilters, operatorFilters), namespaceFilters) LOOP: for ns := range allNamespaces { select { case <-stopCh: break LOOP default: } logger.Printf("Extracting Kubernetes diagnostics from %s\n", ns) zipFile.Add(getResources(kubectl.GetByLabel, ns, namespaceFilters, []string{ "statefulsets", "replicasets", "deployments", "daemonsets", "pods", "persistentvolumeclaims", "services", "endpoints", "configmaps", "controllerrevisions", })) zipFile.Add(getResources(kubectl.GetByName, ns, namespaceFilters, []string{ "kibana", "elasticsearch", "apmserver", })) // Filters is intentionally empty here, as Elastic labels // are not applied to these resources. zipFile.Add(getResources(kubectl.GetByLabel, ns, filters.Empty, []string{ "persistentvolumes", "events", "networkpolicies", "serviceaccount", })) if maxOperatorVersion.AtLeast(version.MustParseSemantic("1.2.0")) { zipFile.Add(getResources(kubectl.GetByName, ns, namespaceFilters, []string{ "enterprisesearch", "beat", })) } if maxOperatorVersion.AtLeast(version.MustParseSemantic("1.4.0")) { zipFile.Add(getResources(kubectl.GetByName, ns, namespaceFilters, []string{ "agent", })) } if maxOperatorVersion.AtLeast(version.MustParseSemantic("1.6.0")) { zipFile.Add(getResources(kubectl.GetByName, ns, namespaceFilters, []string{ "elasticmapsserver", })) } if maxOperatorVersion.AtLeast(logstashMinVersion) { zipFile.Add(getResources(kubectl.GetByName, ns, namespaceFilters, []string{ "logstash", })) } zipFile.Add(map[string]func(io.Writer) error{ archive.Path(ns, "secrets.json"): func(writer io.Writer) error { return kubectl.GetMeta("secrets", ns, writer) }, }) getLogs(kubectl, zipFile, ns, logsFilters, logsLabels...) if params.RunStackDiagnostics { runStackDiagnostics( kubectl, ns, zipFile, params.Verbose, params.DiagnosticImage, params.StackDiagnosticsTimeout, stopCh, namespaceFilters, maxOperatorVersion, ) } if params.RunAgentDiagnostics { runAgentDiagnostics(kubectl, ns, zipFile, params.Verbose, stopCh, namespaceFilters) } } addDiagnosticLogToArchive(zipFile, &log.Buffer) if err := zipFile.Close(); err != nil { // log the errors here and don't return them to the invoking command as we don't want usage help to be // printed in this case logger.Println(err.Error()) } logger.Printf("ECK diagnostics written to %s\n", zipFileName) return nil } // getOperatorLabel will attempt to find the labels associated with the ECK Operator, returning any errors. // // 1) if using yaml manifests or OLM it will always be "control-plane=elastic-operator" // 2) if using Helm, find label with key: helm.sh/chart, and value containing "eck-operator-*" func getOperatorLabel(kc *Kubectl, ns string) (labels.Set, error) { // try the most common case first pure YAML and OLM installations will have this label pods, err := kc.Clientset.CoreV1().Pods(ns).List(context.Background(), v1.ListOptions{LabelSelector: "control-plane=elastic-operator"}) if err == nil && len(pods.Items) > 0 { return labels.Set{"control-plane": "elastic-operator"}, nil } // for Helm use the service account to be independent of Deployment or StatefulSet (even though all Helm installs should use a StatefulSet) saList, err := kc.Clientset.CoreV1().ServiceAccounts(ns).List(context.Background(), v1.ListOptions{}) if err != nil { return nil, fmt.Errorf("while retrieving list of serviceaccounts in ns '%s': %w", ns, err) } for _, sa := range saList.Items { for k, v := range sa.Labels { if k == "helm.sh/chart" && strings.Contains(v, "eck-operator") { // The helm.sh/chart=eck-operator-* label isn't propagated down to sub-resources // so use the app.kubernetes.io/name label, which is propagated. return labels.Set{"app.kubernetes.io/name": sa.Labels["app.kubernetes.io/name"]}, nil } } } return nil, fmt.Errorf("unable to find any resources belonging to the eck operator") } // addDiagnosticLogToArchive adds the passed bytes.Buffer reference as eck-diagnostics.log to the given archive. // The underlying assumption being that the number of log lines produced by this tool is small enough to allow them to // be kept in memory. func addDiagnosticLogToArchive(zipFile *archive.ZipFile, logContents *bytes.Buffer) { writer, err := zipFile.Create("eck-diagnostics.log") if err != nil { zipFile.AddError(err) return } _, err = writer.Write(logContents.Bytes()) zipFile.AddError(err) } // getLogs extracts logs from all Pods that match the given selectors in the namespace ns and adds them to zipFile. func getLogs(k *Kubectl, zipFile *archive.ZipFile, ns string, filters filters.Filters, selector ...string) { for _, s := range selector { if err := k.Logs(ns, s, filters, zipFile.Create); err != nil { zipFile.AddError(err) } } } // getResources produces a map of filenames to functions that will when invoked retrieve the resources identified by rs // and add write them to a writer passed to said functions. func getResources(f func(string, string, filters.Filters, io.Writer) error, ns string, filters filters.Filters, rs []string) map[string]func(io.Writer) error { m := map[string]func(io.Writer) error{} for _, r := range rs { resource := r m[archive.Path(ns, resource+".json")] = func(w io.Writer) error { return f(resource, ns, filters, w) } } return m }