pkg/controller/agent/fleet.go (312 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package agent import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "strings" "time" "github.com/go-logr/logr" "github.com/pkg/errors" "go.elastic.co/apm/module/apmhttp/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/reconcile" agentv1alpha1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/agent/v1alpha1" commonv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/common/v1" v1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/kibana/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/events" commonhttp "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/http" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/net" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/stringsutil" ) const FleetTokenAnnotation = "fleet.eck.k8s.elastic.co/token" //nolint:gosec var errNoMatchingTokenFound = errors.New("no matching active enrollment token found") // EnrollmentAPIKeyResult wrapper for a single result in the Fleet API. type EnrollmentAPIKeyResult struct { Item EnrollmentAPIKey `json:"item"` } // EnrollmentAPIKeyList is a wrapper for a list of enrollment tokens. type EnrollmentAPIKeyList struct { Items []EnrollmentAPIKey `json:"items"` } // EnrollmentAPIKey is the representation of an enrollment token in the Fleet API. type EnrollmentAPIKey struct { ID string `json:"id,omitempty"` Active bool `json:"active,omitempty"` APIKey string `json:"api_key,omitempty"` PolicyID string `json:"policy_id,omitempty"` } func (e EnrollmentAPIKey) isEmpty() bool { return !e.Active && e.ID == "" && e.APIKey == "" && e.PolicyID == "" } // PolicyList is a wrapper for a list of agent policies as returned by the Fleet API. type PolicyList struct { Items []Policy `json:"items"` } // Policy is the representation of an agent policy in the Fleet API. type Policy struct { ID string `json:"id"` IsDefault bool `json:"is_default"` IsDefaultFleetServer bool `json:"is_default_fleet_server"` Status string `json:"status"` } type fleetAPI struct { client *http.Client endpoint string username string password string kibanaVersion string log logr.Logger } func newFleetAPI(dialer net.Dialer, settings connectionSettings, logger logr.Logger) fleetAPI { return fleetAPI{ client: apmhttp.WrapClient( commonhttp.Client(dialer, settings.caCerts, 60*time.Second), apmhttp.WithClientRequestName(tracing.RequestName), apmhttp.WithClientSpanType("external.kibana"), ), kibanaVersion: settings.version, endpoint: settings.host, username: settings.credentials.Username, password: settings.credentials.Password, log: logger, } } func (f fleetAPI) request( ctx context.Context, method string, pathWithQuery string, requestObj, responseObj interface{}) error { var body io.Reader = http.NoBody if requestObj != nil { outData, err := json.Marshal(requestObj) if err != nil { return err } body = bytes.NewBuffer(outData) } request, err := http.NewRequestWithContext(ctx, method, stringsutil.Concat(f.endpoint, "/api/fleet/", pathWithQuery), body) if err != nil { return err } // Sets headers allowing ES to distinguish between deprecated APIs used internally and by the user if request.Header == nil { request.Header = make(http.Header) } request.Header.Set(commonhttp.InternalProductRequestHeaderKey, commonhttp.InternalProductRequestHeaderValue) request.Header.Set("kbn-xsrf", "true") request.SetBasicAuth(f.username, f.password) f.log.V(1).Info( "Fleet API HTTP request", "method", request.Method, "url", request.URL.Redacted(), ) resp, err := f.client.Do(request) if err != nil { return err } defer resp.Body.Close() if err := commonhttp.MaybeAPIError(resp); err != nil { return err } if responseObj != nil { if err := json.NewDecoder(resp.Body).Decode(responseObj); err != nil { return err } } return nil } func (f fleetAPI) enrollmentAPIKeyPath() string { path := "enrollment_api_keys" if strings.HasPrefix(f.kibanaVersion, "7") { path = "enrollment-api-keys" } return path } func (f fleetAPI) createEnrollmentAPIKey(ctx context.Context, policyID string) (EnrollmentAPIKey, error) { var response EnrollmentAPIKeyResult err := f.request(ctx, http.MethodPost, f.enrollmentAPIKeyPath(), EnrollmentAPIKey{PolicyID: policyID}, &response) return response.Item, err } func (f fleetAPI) getEnrollmentAPIKey(ctx context.Context, keyID string) (EnrollmentAPIKey, error) { var response EnrollmentAPIKeyResult err := f.request(ctx, http.MethodGet, fmt.Sprintf("%s/%s", f.enrollmentAPIKeyPath(), keyID), nil, &response) return response.Item, err } func (f fleetAPI) findAgentPolicy(ctx context.Context, filter func(policy Policy) bool) (Policy, error) { page := 1 for { var list PolicyList if err := f.request(ctx, http.MethodGet, fmt.Sprintf("agent_policies?perPage=20&page=%d", page), nil, &list); err != nil { return Policy{}, err } if len(list.Items) == 0 { break } for _, p := range list.Items { if filter(p) { return p, nil } } page++ } return Policy{}, errors.New("no matching agent policy found") } func (f fleetAPI) findEnrollmentAPIKey(ctx context.Context, policyID string) (EnrollmentAPIKey, error) { page := 1 for { var list EnrollmentAPIKeyList if err := f.request(ctx, http.MethodGet, fmt.Sprintf("%s?perPage=20&page=%d", f.enrollmentAPIKeyPath(), page), nil, &list); err != nil { return EnrollmentAPIKey{}, err } if len(list.Items) == 0 { break } for _, t := range list.Items { if t.Active && t.PolicyID == policyID { return t, nil } } page++ } return EnrollmentAPIKey{}, errNoMatchingTokenFound } func (f fleetAPI) defaultFleetServerPolicyID(ctx context.Context) (string, error) { policy, err := f.findAgentPolicy(ctx, func(policy Policy) bool { return policy.IsDefaultFleetServer && policy.Status == "active" }) if err != nil { return "", err } return policy.ID, nil } func (f fleetAPI) defaultAgentPolicyID(ctx context.Context) (string, error) { policy, err := f.findAgentPolicy(ctx, func(policy Policy) bool { return policy.IsDefault && policy.Status == "active" }) if err != nil { return "", err } return policy.ID, nil } func (f fleetAPI) setupFleet(ctx context.Context) error { return f.request(ctx, http.MethodPost, "setup", nil, nil) } func maybeReconcileFleetEnrollment(params Params, result *reconciler.Results) EnrollmentAPIKey { if !params.Agent.Spec.KibanaRef.IsDefined() { return EnrollmentAPIKey{} } log := params.Logger() reachable, err := isKibanaReachable(params.Context, params.Client, params.Agent.Spec.KibanaRef.WithDefaultNamespace(params.Agent.Namespace).NamespacedName()) if err != nil { result.WithError(err) return EnrollmentAPIKey{} } if !reachable { // we requeue if Kibana is unavailable: surface this condition to the user message := "Delaying deployment of Elastic Agent in Fleet Mode as Kibana is not available yet" log.Info(message) params.EventRecorder.Event(&params.Agent, corev1.EventTypeWarning, events.EventReasonDelayed, message) result.WithResult(reconcile.Result{Requeue: true}) return EnrollmentAPIKey{} } kbConnectionSettings, err := extractClientConnectionSettings(params.Context, params.Agent, params.Client, commonv1.KibanaAssociationType) if err != nil { result.WithError(err) return EnrollmentAPIKey{} } token, err := reconcileEnrollmentToken( params, newFleetAPI( params.OperatorParams.Dialer, kbConnectionSettings, log), ) switch { case commonhttp.IsUnauthorized(err): message := "ECK cannot setup Fleet enrollment. Waiting for Kibana credentials. This should be a transient issue." log.V(1).Info(err.Error()) log.Info(message) params.EventRecorder.Event(&params.Agent, corev1.EventTypeWarning, events.EventReasonDelayed, message) result.WithResult(reconcile.Result{Requeue: true}) case commonhttp.IsNotFound(err): message := fmt.Sprintf("ECK cannot setup Fleet enrollment. This is likely a mis-configuration. %s", err.Error()) log.Info(message) params.EventRecorder.Event(&params.Agent, corev1.EventTypeWarning, events.EventReasonUnexpected, message) result.WithResult(reconcile.Result{Requeue: true}) case err != nil: result.WithError(err) } return token } func isKibanaReachable(ctx context.Context, client k8s.Client, kibanaNSN types.NamespacedName) (bool, error) { var kb v1.Kibana err := client.Get(ctx, kibanaNSN, &kb) if err != nil { return false, err } if kb.Status.Health != commonv1.GreenHealth { return false, nil // requeue } return true, nil } func reconcileEnrollmentToken(params Params, api fleetAPI) (EnrollmentAPIKey, error) { defer api.client.CloseIdleConnections() agent := params.Agent ctx := params.Context // do we have an existing token that we have rolled out previously? tokenName, exists := agent.Annotations[FleetTokenAnnotation] if !exists { // setup fleet to create default policies (and tokens) if err := api.setupFleet(ctx); err != nil { return EnrollmentAPIKey{}, err } } // what policy should we enroll this agent in? policyID, err := findPolicyID(ctx, params.EventRecorder, agent, api) if err != nil { return EnrollmentAPIKey{}, err } if exists { // get the enrollment token identified by the annotation key, err := api.getEnrollmentAPIKey(ctx, tokenName) // the annotation might contain corrupted or no longer valid information if err != nil && commonhttp.IsNotFound(err) { goto FindOrCreate } if err != nil { return EnrollmentAPIKey{}, err } // if the token is valid and for the right policy we are done here if key.Active && key.PolicyID == policyID { return key, nil } } FindOrCreate: key, err := api.findEnrollmentAPIKey(ctx, policyID) if err != nil && errors.Is(err, errNoMatchingTokenFound) { ulog.FromContext(ctx).Info("Could not find existing Fleet enrollment API keys, creating new one", "error", err.Error()) key, err = api.createEnrollmentAPIKey(ctx, policyID) if err != nil { return EnrollmentAPIKey{}, err } } if err != nil { return EnrollmentAPIKey{}, err } // this potentially creates conflicts we could introduce reconciler state similar to the ES controller and handle it on the top level if agent.Annotations == nil { agent.Annotations = map[string]string{} } agent.Annotations[FleetTokenAnnotation] = key.ID err = params.Client.Update(ctx, &agent) if err != nil { return EnrollmentAPIKey{}, err } return key, nil } func findPolicyID(ctx context.Context, recorder record.EventRecorder, agent agentv1alpha1.Agent, api fleetAPI) (string, error) { if agent.Spec.PolicyID != "" { return agent.Spec.PolicyID, nil } recorder.Event(&agent, corev1.EventTypeWarning, events.EventReasonValidation, agentv1alpha1.MissingPolicyIDMessage) ulog.FromContext(ctx).Info(agentv1alpha1.MissingPolicyIDMessage) if agent.Spec.FleetServerEnabled { return api.defaultFleetServerPolicyID(ctx) } return api.defaultAgentPolicyID(ctx) }