internal/stackdiag.go (411 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package internal
import (
"bytes"
"context"
_ "embed"
"errors"
"fmt"
"strings"
"sync"
"text/template"
"time"
"github.com/ghodss/yaml"
"golang.org/x/text/cases"
"golang.org/x/text/language"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/cli-runtime/pkg/resource"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/kubectl/pkg/util/podutils"
"k8s.io/utils/ptr"
"github.com/elastic/eck-diagnostics/internal/archive"
"github.com/elastic/eck-diagnostics/internal/extraction"
internal_filters "github.com/elastic/eck-diagnostics/internal/filters"
)
const (
DiagnosticImage = "docker.elastic.co/eck-dev/support-diagnostics:9.1.0"
podOutputDir = "/diagnostic-output"
podMainContainerName = "stack-diagnostics"
// names used to identify different stack diagnostic job types (need to match the names of the corresponding CRDs)
elasticsearchJob = "elasticsearch"
kibanaJob = "kibana"
logstashJob = "logstash"
diagnosticsUsername = "elastic-internal-diagnostics"
)
var (
//go:embed job.tpl.yml
jobTemplate string
// jobPollingInterval is used to configure the informer used to be notified of Pod status changes.
jobPollingInterval = 10 * time.Second
// logstashMinVersion is the ECK version in which Logstash support has been introduced.
logstashMinVersion = version.MustParseSemantic("2.8.0")
)
// supportedStackDiagTypes returns the list of stack apps supported by elastic/support-diagnostics.
func supportedStackDiagTypesFor(eckVersion *version.Version) []string {
supportedStackDiagTypes := []string{elasticsearchJob, kibanaJob}
if eckVersion.AtLeast(logstashMinVersion) {
supportedStackDiagTypes = append(supportedStackDiagTypes, logstashJob)
}
return supportedStackDiagTypes
}
// diagJob represents a pod whose job it is to extract diagnostic data from an Elasticsearch cluster.
type diagJob struct {
sync.RWMutex
extraction.RemoteSource
d bool
timer *time.Timer
done chan struct{}
}
func (d *diagJob) StartTimer(dur time.Duration) <-chan time.Time {
d.Lock()
defer d.Unlock()
d.timer = time.NewTimer(dur)
return d.timer.C
}
func (d *diagJob) Done() bool {
d.RLock()
defer d.RUnlock()
return d.d
}
func (d *diagJob) MarkDone() {
d.Lock()
defer d.Unlock()
d.d = true
if d.timer != nil {
// We are OK with not draining the timer channel here. We do not want to reuse it, and we don't want to block
// under any circumstance. The only point here is to avoid the timer from firing once the job is complete.
d.timer.Stop()
}
d.done <- struct{}{}
}
// diagJobState captures the state of running a set of jobs to extract diagnostics from Elastic Stack applications.
type diagJobState struct {
ns string
kubectl *Kubectl
informer cache.SharedInformer
jobs map[string]*diagJob
context context.Context
cancelFunc context.CancelFunc
verbose bool
diagnosticImage string
jobTimeout time.Duration
}
// newDiagJobState creates a new state struct to run diagnostic Pods.
func newDiagJobState(k *Kubectl, ns string, verbose bool, image string, jobTimeout time.Duration, stopCh chan struct{}) *diagJobState {
ctx, cancelFunc := context.WithCancel(context.Background())
factory := informers.NewSharedInformerFactoryWithOptions(
k,
jobPollingInterval,
informers.WithNamespace(ns),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = "app.kubernetes.io/name=eck-diagnostics"
}))
state := &diagJobState{
jobs: map[string]*diagJob{},
ns: ns,
kubectl: k,
informer: factory.Core().V1().Pods().Informer(),
cancelFunc: cancelFunc,
context: ctx,
verbose: verbose,
diagnosticImage: image,
jobTimeout: jobTimeout,
}
go func() {
<-stopCh
cancelFunc()
}()
return state
}
// scheduleJob creates a Pod to extract diagnostic data from an Elasticsearch cluster or Kibana called resourceName.
func (ds *diagJobState) scheduleJob(typ, esSecretName, esSecretKey, resourceName string, tls bool) error {
podName := fmt.Sprintf("%s-%s-diag", resourceName, typ)
tpl, err := template.New("job").Parse(jobTemplate)
if err != nil {
return err
}
diagnosticType, svcSuffix := diagnosticTypeForApplication(typ)
buffer := new(bytes.Buffer)
data := map[string]interface{}{
"PodName": podName,
"DiagnosticImage": ds.diagnosticImage,
"Namespace": ds.ns,
"ESSecretName": esSecretName,
"ESSecretKey": esSecretKey,
"SVCName": fmt.Sprintf("%s-%s", resourceName, svcSuffix),
"Type": diagnosticType,
"TLS": tls,
"OutputDir": podOutputDir,
"MainContainerName": podMainContainerName,
}
err = tpl.Execute(buffer, data)
if err != nil {
return err
}
var pod corev1.Pod
err = yaml.Unmarshal(buffer.Bytes(), &pod)
if err != nil {
return err
}
err = ds.kubectl.CoreV1().Pods(ds.ns).Delete(context.Background(), podName, metav1.DeleteOptions{GracePeriodSeconds: ptr.To[int64](0)})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
_, err = ds.kubectl.CoreV1().Pods(ds.ns).Create(context.Background(), &pod, metav1.CreateOptions{})
if err != nil {
return err
}
var job = diagJob{
RemoteSource: extraction.RemoteSource{
Namespace: ds.ns,
PodName: podName,
Typ: typ,
ResourceName: resourceName,
PodOutputDir: podOutputDir,
},
done: make(chan struct{}, 1),
}
// start a dedicated timer for each job and terminate the job when the timer expires.
go func(j *diagJob) {
timerChan := j.StartTimer(ds.jobTimeout)
select {
case <-timerChan:
logger.Printf("Diagnostic job for %s %s/%s timed out, terminating", j.Typ, j.Namespace, j.ResourceName)
if err = ds.terminateJob(context.Background(), j); err != nil {
logger.Printf("while terminating job %s", err.Error())
}
case <-j.done:
// we use separate done signal here to avoid building up lots of go routines that are only terminated by
// the overall termination of the program if a job does not exceed its timeout.
}
}(&job)
ds.jobs[podName] = &job
return nil
}
// diagnosticTypeForApplication returns the diagnosticType as expected by the stack diagnostics tool and the suffix
// used by ECK in service names for the given application type.
func diagnosticTypeForApplication(typ string) (string, string) {
switch typ {
case elasticsearchJob:
return "api", "es-http"
case kibanaJob:
return "kibana-api", "kb-http"
case logstashJob:
return "logstash-api", "ls-api"
}
panic("programming error: unknown type")
}
// extractFromRemote runs the equivalent of "kubectl cp" to extract the stack diagnostics from a remote Pod.
func (ds *diagJobState) extractFromRemote(pod *corev1.Pod, file *archive.ZipFile) {
job, found := ds.jobs[pod.Name]
if !found {
file.AddError(fmt.Errorf("no job for Pod %s/%s", pod.Namespace, pod.Name))
return
}
nsn := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
reader, err := ds.kubectl.Copy(nsn, podMainContainerName, podOutputDir, file.AddError)
if err != nil {
file.AddError(err)
return
}
if err := extraction.UntarIntoZip(reader, job.RemoteSource, file, ds.verbose); err != nil {
file.AddError(err)
return
}
err = ds.completeJob(job)
if err != nil {
file.AddError(err)
return
}
}
// extractJobResults runs an informer to be notified of Pod status changes and extract diagnostic data from any Pod
// that has reached running state.
func (ds *diagJobState) extractJobResults(file *archive.ZipFile) {
_, err := ds.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if pod, ok := obj.(*corev1.Pod); ok && ds.verbose {
logger.Printf("Diagnostic pod %s/%s added\n", pod.Namespace, pod.Name)
}
},
UpdateFunc: func(_, newObj interface{}) {
pod, ok := newObj.(*corev1.Pod)
if !ok {
logger.Printf("Unexpected %v, expected type Pod\n", newObj)
return
}
job, found := ds.jobs[pod.Name]
if !found {
logger.Printf("Unexpected no record for Pod %s/%s\n", pod.Namespace, pod.Name)
return
}
if job.Done() {
return
}
switch pod.Status.Phase {
case corev1.PodPending:
if err := ds.detectImageErrors(pod); err != nil {
file.AddError(err)
file.AddError(ds.terminateJob(ds.context, job))
}
case corev1.PodUnknown:
logger.Printf("Unexpected diagnostic Pod %s/%s in unknown phase", pod.Namespace, pod.Name)
case corev1.PodRunning:
if podutils.IsPodReady(pod) {
ds.extractFromRemote(pod, file)
}
case corev1.PodSucceeded:
file.AddError(fmt.Errorf("unexpected: Pod %s/%s succeeded", pod.Namespace, pod.Name))
file.AddError(ds.completeJob(job))
case corev1.PodFailed:
file.AddError(fmt.Errorf("unexpected: Pod %s/%s failed", pod.Namespace, pod.Name))
file.AddError(ds.completeJob(job))
}
},
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
logger.Printf("Unexpected %v, expected type Pod", obj)
return
}
if ds.verbose {
logger.Printf("%s/%s deleted", pod.Namespace, pod.Name)
}
done := true
for _, j := range ds.jobs {
if !j.Done() {
done = false
}
}
if done {
ds.cancelFunc()
}
},
})
if err != nil {
file.AddError(err)
}
ds.informer.Run(ds.context.Done())
err = ds.context.Err()
// we cancel the context when we are done but want to log any other errors e.g. deadline exceeded
if err != nil && !errors.Is(err, context.Canceled) {
file.AddError(fmt.Errorf("extracting Elastic stack diagnostic for namespace %s: %w", ds.ns, err))
}
// make sure any open jobs are aborted at this point, under normal circumstances this should be a NOOP
// when interrupted jobs might still be running and should be stopped now.
file.AddError(ds.abortAllJobs())
}
// abortAllJobs terminates all open jobs.
func (ds *diagJobState) abortAllJobs() error {
var errs []error
for _, j := range ds.jobs {
if !j.Done() {
logger.Printf("Aborting diagnostic extraction for %s %s/%s", j.Typ, ds.ns, j.ResourceName)
// use a new context for this cleanup as the main context might have been cancelled already
errs = append(errs, ds.terminateJob(context.Background(), j))
}
}
return utilerrors.NewAggregate(errs)
}
// completeJob to be called after successful completion, terminates the job.
func (ds *diagJobState) completeJob(job *diagJob) error {
logger.Printf("%s diagnostics extracted for %s/%s\n", cases.Title(language.English).String(job.Typ), ds.ns, job.ResourceName)
return ds.terminateJob(ds.context, job)
}
// terminateJob marks job as done and deletes diagnostic Pod.
func (ds *diagJobState) terminateJob(ctx context.Context, job *diagJob) error {
job.MarkDone()
return ds.kubectl.CoreV1().Pods(ds.ns).Delete(ctx, job.PodName, metav1.DeleteOptions{GracePeriodSeconds: ptr.To[int64](0)})
}
// detectImageErrors tries to detect Image pull errors on the diagnostic container. Callers should then terminate the job
// as there is little chance of the image being made available during the execution time of the tool.
func (ds *diagJobState) detectImageErrors(pod *corev1.Pod) error {
for _, status := range pod.Status.ContainerStatuses {
if status.State.Waiting != nil && strings.Contains(status.State.Waiting.Reason, "Image") {
return fmt.Errorf("failed running stack diagnostics: %s:%s", status.State.Waiting.Reason, status.State.Waiting.Message)
}
}
return nil
}
// runStackDiagnostics extracts diagnostic data from all clusters in the given namespace ns using the official
// Elasticsearch support diagnostics.
func runStackDiagnostics(
k *Kubectl, ns string,
zipFile *archive.ZipFile,
verbose bool,
image string,
jobTimeout time.Duration,
stopCh chan struct{},
filters internal_filters.Filters,
eckVersion *version.Version,
) {
state := newDiagJobState(k, ns, verbose, image, jobTimeout, stopCh)
for _, typ := range supportedStackDiagTypesFor(eckVersion) {
if err := scheduleJobs(k, ns, zipFile.AddError, state, typ, filters); err != nil {
zipFile.AddError(err)
return
}
}
// don't start extracting if there is nothing to do
if len(state.jobs) == 0 {
return
}
state.extractJobResults(zipFile)
}
// scheduleJobs lists all resources of type typ and schedules a diagnostic job for each of them
func scheduleJobs(k *Kubectl, ns string, recordErr func(error), state *diagJobState, typ string, filters internal_filters.Filters) error {
resources, err := k.getResources(typ, ns)
if err != nil {
return err // not recoverable
}
return resources.Visit(func(resourceInfo *resource.Info, err error) error {
if err != nil {
// record error but continue trying for other resources
recordErr(err)
}
isTLS, esName, err := extractEsInfo(typ, ns, resourceInfo)
if err != nil {
recordErr(err)
}
resourceName := resourceInfo.Name
if !filters.Empty() && !filters.Contains(resourceName, typ) {
return nil
}
esSecretName, esSecretKey, err := determineESCredentialsSecret(k, ns, esName)
if err != nil {
// If no credentials secret was found attempt to continue with the next resource
recordErr(fmt.Errorf("while determining Elasticsearch credentials secret: %w", err))
return nil
}
recordErr(state.scheduleJob(typ, esSecretName, esSecretKey, resourceName, isTLS))
return nil
})
}
func extractEsInfo(typ string, ns string, resourceInfo *resource.Info) (bool, string, error) {
resourceName := resourceInfo.Name
es, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resourceInfo.Object)
if err != nil {
return false, "", err
}
var isTLS bool
switch typ {
case logstashJob:
// Logstash API SSL is not yet configurable via spec.http.tls, try to read the config as a best-effort,
// to change after https://github.com/elastic/cloud-on-k8s/issues/6971 is fixed.
enabled, found, err := unstructured.NestedBool(es, "spec", "config", "api.ssl.enabled")
if err != nil {
return false, "", err
}
isTLS = found && enabled
default:
disabled, found, err := unstructured.NestedBool(es, "spec", "http", "tls", "selfSignedCertificate", "disabled")
if err != nil {
return false, "", err
}
isTLS = !(found && disabled)
}
var esName string
switch typ {
case elasticsearchJob:
esName = resourceName
case kibanaJob:
name, found, err := unstructured.NestedString(es, "spec", "elasticsearchRef", "name")
if err != nil {
return false, "", err
}
if !found || name == "" {
logger.Printf("Skipping %s/%s as elasticsearchRef is not defined", ns, resourceName)
return false, "", nil
}
esName = name
case logstashJob:
// Logstash doesn't store its credentials in Elastiscearch,
// api.auth.* settings not yet supported
esName = ""
default:
panic("unknown type while extracting es info")
}
return isTLS, esName, nil
}
// determineESCredentialsSecret returns the name of the secret containing the Elasticsearch credentials attempting
// first to use the "elastic-internal-diagnostics" user secret, and then attempting to use the "elastic" user
// and returning an error if either cannot be found, or the "elastic-internal-diagnostics" user does not have the
// expected key.
func determineESCredentialsSecret(k *Kubectl, ns, esName string) (secretName, secretKey string, err error) {
diagnosticUserSecretName := fmt.Sprintf("%s-es-internal-users", esName)
secret, err := k.CoreV1().Secrets(ns).Get(context.Background(), diagnosticUserSecretName, metav1.GetOptions{})
if err == nil {
if _, ok := secret.Data[diagnosticsUsername]; ok {
return diagnosticUserSecretName, diagnosticsUsername, nil
}
}
elasticSecretName := fmt.Sprintf("%s-es-elastic-user", esName)
if _, err := k.CoreV1().Secrets(ns).Get(context.Background(), elasticSecretName, metav1.GetOptions{}); err == nil {
return elasticSecretName, "elastic", nil
}
return "", "", fmt.Errorf("no credentials secret found for Elasticsearch %s", esName)
}