internal/pkg/dl/actions.go (177 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package dl import ( "bytes" "context" "encoding/json" "errors" "time" "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dsl" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" "github.com/rs/zerolog" ) const ( FieldAgents = "agents" FieldExpiration = "expiration" FieldSize = "size" maxAgentActionsFetchSize = 100 ) var ( QueryAction = prepareFindAction() QueryAllAgentActions = prepareFindAllAgentsActions() QueryAgentActions = prepareFindAgentActions() // Query for expired actions GC QueryDeleteExpiredActions = prepareDeleteExpiredAction() QueryFindExpiredActions = prepareFindExpiredAction() ) func prepareFindAllAgentsActions() *dsl.Tmpl { tmpl, root, _ := createBaseActionsQuery() tmpl.MustResolve(root) return tmpl } func prepareFindAction() *dsl.Tmpl { tmpl := dsl.NewTmpl() root := dsl.NewRoot() filter := root.Query().Bool().Filter() filter.Term(FieldActionID, tmpl.Bind(FieldActionID), nil) root.Source().Excludes(FieldAgents) tmpl.MustResolve(root) return tmpl } func prepareDeleteExpiredAction() *dsl.Tmpl { tmpl := dsl.NewTmpl() root := dsl.NewRoot() filter := root.Query().Bool().Filter() filter.Range(FieldExpiration, dsl.WithRangeLTE(tmpl.Bind(FieldExpiration))) tmpl.MustResolve(root) return tmpl } func prepareFindExpiredAction() *dsl.Tmpl { tmpl := dsl.NewTmpl() root := dsl.NewRoot() filter := root.Query().Bool().Filter() filter.Range(FieldExpiration, dsl.WithRangeLTE(tmpl.Bind(FieldExpiration))) // Select only acton ids for deletion root.Source().Includes("_id") root.WithSize(tmpl.Bind(FieldSize)) tmpl.MustResolve(root) return tmpl } func prepareFindAgentActions() *dsl.Tmpl { tmpl, root, filter := createBaseActionsQuery() filter.Terms(FieldAgents, tmpl.Bind(FieldAgents), nil) // Select more actions per agent since the agents array is not loaded root.Size(maxAgentActionsFetchSize) root.Source().Excludes(FieldAgents) tmpl.MustResolve(root) return tmpl } func createBaseActionsQuery() (tmpl *dsl.Tmpl, root, filter *dsl.Node) { tmpl = dsl.NewTmpl() root = dsl.NewRoot() root.Param(seqNoPrimaryTerm, true) filter = root.Query().Bool().Filter() filter.Range(FieldSeqNo, dsl.WithRangeGT(tmpl.Bind(FieldSeqNo))) filter.Range(FieldSeqNo, dsl.WithRangeLTE(tmpl.Bind(FieldMaxSeqNo))) filter.Range(FieldExpiration, dsl.WithRangeGT(tmpl.Bind(FieldExpiration))) root.Sort().SortOrder(FieldSeqNo, dsl.SortAscend) return //nolint:nakedret // simple function } func FindAction(ctx context.Context, bulker bulk.Bulk, id string, opts ...Option) ([]model.Action, error) { o := newOption(FleetActions, opts...) return findActions(ctx, bulker, QueryAction, o.indexName, map[string]interface{}{ FieldActionID: id, }, nil) } func FindAgentActions(ctx context.Context, bulker bulk.Bulk, minSeqNo, maxSeqNo sqn.SeqNo, agentID string) ([]model.Action, error) { const index = FleetActions params := map[string]interface{}{ FieldSeqNo: minSeqNo.Value(), FieldMaxSeqNo: maxSeqNo.Value(), FieldExpiration: time.Now().UTC().Format(time.RFC3339), FieldAgents: []string{agentID}, } res, err := findActionsHits(ctx, bulker, QueryAgentActions, index, params, maxSeqNo) if err != nil || res == nil { return nil, err } return hitsToActions(res.Hits) } func DeleteExpiredForIndex(ctx context.Context, index string, bulker bulk.Bulk, cleanupIntervalAfterExpired string) (int64, error) { params := map[string]interface{}{ FieldExpiration: "now-" + cleanupIntervalAfterExpired, } query, err := QueryDeleteExpiredActions.Render(params) if err != nil { return 0, err } res, err := bulker.Client().API.DeleteByQuery([]string{index}, bytes.NewReader(query), bulker.Client().API.DeleteByQuery.WithContext(ctx)) if err != nil { return 0, err } defer res.Body.Close() var esres es.DeleteByQueryResponse err = json.NewDecoder(res.Body).Decode(&esres) if err != nil { return 0, err } if res.IsError() { err = es.TranslateError(res.StatusCode, esres.Error) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { zerolog.Ctx(ctx).Debug().Str("index", index).Msg(es.ErrIndexNotFound.Error()) err = nil } return 0, err } } return esres.Deleted, nil } func FindExpiredActionsHitsForIndex(ctx context.Context, index string, bulker bulk.Bulk, expiredBefore time.Time, size int) ([]es.HitT, error) { params := map[string]interface{}{ FieldExpiration: expiredBefore.UTC().Format(time.RFC3339), FieldSize: size, } res, err := findActionsHits(ctx, bulker, QueryFindExpiredActions, index, params, nil) if err != nil { return nil, err } if res != nil { return res.Hits, nil } return nil, nil } func findActionsHits(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, index string, params map[string]interface{}, seqNos []int64) (*es.HitsT, error) { var ops []bulk.Opt if len(seqNos) > 0 { ops = append(ops, bulk.WithWaitForCheckpoints(seqNos)) } res, err := Search(ctx, bulker, tmpl, index, params, ops...) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { zerolog.Ctx(ctx).Debug().Str("index", index).Msg(es.ErrIndexNotFound.Error()) err = nil } return nil, err } return res, nil } func findActions(ctx context.Context, bulker bulk.Bulk, tmpl *dsl.Tmpl, index string, params map[string]interface{}, seqNos []int64) ([]model.Action, error) { res, err := findActionsHits(ctx, bulker, tmpl, index, params, seqNos) if err != nil || res == nil { return nil, err } return hitsToActions(res.Hits) } func hitsToActions(hits []es.HitT) ([]model.Action, error) { actions := make([]model.Action, 0, len(hits)) for _, hit := range hits { var action model.Action err := hit.Unmarshal(&action) if err != nil { return nil, err } actions = append(actions, action) } return actions, nil }