internal/kubectl/kubectl_apply.go (188 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package kubectl import ( "context" "fmt" "strings" "time" "gopkg.in/yaml.v3" "helm.sh/helm/v3/pkg/kube" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/cli-runtime/pkg/genericclioptions" kresource "k8s.io/cli-runtime/pkg/resource" "k8s.io/client-go/rest" "github.com/elastic/elastic-package/internal/logger" ) const readinessTimeout = 10 * time.Minute type resource struct { APIVersion string `yaml:"apiVersion"` Kind string `yaml:"kind"` Metadata metadata `yaml:"metadata"` Status *status `yaml:"status"` Items []resource `yaml:"items"` } func (r resource) String() string { return fmt.Sprintf("%s (kind: %s, namespace: %s)", r.Metadata.Name, r.Kind, r.Metadata.Namespace) } type metadata struct { Name string `yaml:"name"` Namespace string `yaml:"namespace"` } type status struct { Conditions *[]condition } //lint:ignore U1000 unused, but let's keep it by now. func (s status) isReady() (*condition, bool) { if s.Conditions == nil { return nil, false // safe fallback } for _, c := range *s.Conditions { if (c.Type == "Ready" || c.Type == "Available") && !strings.Contains(c.Message, "does not have minimum availability") { return &c, true } } return nil, false } type condition struct { LastUpdateTime time.Time `yaml:"lastUpdateTime"` Message string `yaml:"message"` Type string `yaml:"type"` } func (c condition) String() string { return fmt.Sprintf("%s (type: %s, time: %v)", c.Message, c.Type, c.LastUpdateTime) } // Apply function adds resources to the Kubernetes cluster based on provided definitions. func Apply(ctx context.Context, definitionsPath []string) error { logger.Debugf("Apply Kubernetes custom definitions") out, err := modifyKubernetesResources(ctx, "apply", definitionsPath) if err != nil { return fmt.Errorf("can't modify Kubernetes resources (apply): %w", err) } logger.Debugf("Handle \"apply\" command output") err = handleApplyCommandOutput(out) if err != nil { return fmt.Errorf("can't handle command output: %w", err) } return nil } // ApplyStdin function adds resources to the Kubernetes cluster based on provided stdin. func ApplyStdin(ctx context.Context, input []byte) error { logger.Debugf("Apply Kubernetes stdin") out, err := applyKubernetesResourcesStdin(ctx, input) if err != nil { return fmt.Errorf("can't modify Kubernetes resources (apply stdin): %w", err) } logger.Debugf("Handle \"apply\" command output") err = handleApplyCommandOutput(out) if err != nil { return fmt.Errorf("can't handle command output: %w", err) } return nil } func handleApplyCommandOutput(out []byte) error { logger.Debugf("Extract resources from command output") resources, err := extractResources(out) if err != nil { return fmt.Errorf("can't extract resources: %w", err) } logger.Debugf("Wait for ready resources") err = waitForReadyResources(resources) if err != nil { return fmt.Errorf("resources are not ready: %w", err) } return nil } func waitForReadyResources(resources []resource) error { var resList kube.ResourceList for _, r := range resources { resInfo, err := createResourceInfo(r) if err != nil { return fmt.Errorf("can't fetch resource info: %w", err) } resList = append(resList, resInfo) } kubeClient := kube.New(nil) kubeClient.Log = func(s string, i ...interface{}) { logger.Debugf(s, i...) } // In case of elastic-agent daemonset Wait will not work as expected // because in single node clusters one pod of the daemonset can always // be unavailable (DaemonSet.spec.updateStrategy.rollingUpdate.maxUnavailable defaults to 1). // daemonSetReady will return true regardless of the pod not being ready yet. // Can be solved with multi-node clusters. // TODO: Support context cancelation in this wait. We rely on a helm waiter // that doesn't support it. err := kubeClient.Wait(resList, readinessTimeout) if err != nil { return fmt.Errorf("waiter failed: %w", err) } return nil } func extractResources(output []byte) ([]resource, error) { r, err := extractResource(output) if err != nil { return nil, err } if len(r.Items) == 0 { return []resource{*r}, nil } return r.Items, nil } func extractResource(output []byte) (*resource, error) { var r resource err := yaml.Unmarshal(output, &r) if err != nil { return nil, fmt.Errorf("can't unmarshal command output: %w", err) } return &r, nil } func createResourceInfo(r resource) (*kresource.Info, error) { scope := meta.RESTScopeNamespace if r.Metadata.Namespace == "" { scope = meta.RESTScopeRoot } restClient, err := createRESTClientForResource(r) if err != nil { return nil, fmt.Errorf("can't create REST client for resource: %w", err) } var group string var version string if !strings.Contains(r.APIVersion, "/") { version = r.APIVersion } else { i := strings.Index(r.APIVersion, "/") group = r.APIVersion[:i] version = r.APIVersion[i+1:] } resInfo := &kresource.Info{ Name: r.Metadata.Name, Namespace: r.Metadata.Namespace, Mapping: &meta.RESTMapping{ GroupVersionKind: schema.GroupVersionKind{ Group: group, Version: version, Kind: strings.ToLower(r.Kind), }, Resource: schema.GroupVersionResource{ Group: group, Version: version, Resource: r.Kind + "s"}, // "s" is for plural Scope: scope, }, Client: restClient, } logger.Debugf("Sync resource info: %s (kind: %s, namespace: %s)", r.Metadata.Name, r.Kind, r.Metadata.Namespace) err = resInfo.Get() if err != nil { return nil, fmt.Errorf("can't sync resource info: %w", err) } return resInfo, nil } func createRESTClientForResource(r resource) (*rest.RESTClient, error) { restClientGetter := genericclioptions.NewConfigFlags(true) restConfig, err := restClientGetter.ToRESTConfig() if err != nil { return nil, fmt.Errorf("can't convert to REST config: %w", err) } restConfig.NegotiatedSerializer = kresource.UnstructuredPlusDefaultContentConfig().NegotiatedSerializer if !strings.Contains(r.APIVersion, "/") { restConfig.APIPath = "/api/" + r.APIVersion } else { restConfig.APIPath = "/apis/" + r.APIVersion } restClient, err := rest.UnversionedRESTClientFor(restConfig) if err != nil { return nil, fmt.Errorf("can't create unversioned REST client: %w", err) } return restClient, nil }