internal/pkg/dl/policies.go (96 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package dl
import (
"context"
"encoding/json"
"errors"
"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/rs/zerolog"
"github.com/elastic/fleet-server/v7/internal/pkg/dsl"
)
var (
tmplQueryLatestPolicies = prepareQueryLatestPolicies()
ErrMissingAggregations = errors.New("missing expected aggregation result")
tmplQueryPolicies = prepareQueryPolicies()
)
func prepareQueryLatestPolicies() []byte {
root := dsl.NewRoot()
root.Size(0)
policyID := root.Aggs().Agg(FieldPolicyID)
policyID.Terms("field", FieldPolicyID, nil).Size(10000)
revisionIdx := policyID.Aggs().Agg(FieldRevisionIdx).TopHits()
revisionIdx.Size(1)
rSort := revisionIdx.Sort()
rSort.SortOrder(FieldRevisionIdx, dsl.SortDescend)
return root.MustMarshalJSON()
}
// QueryLatestPolicies gets the latest revision for a policy
func QueryLatestPolicies(ctx context.Context, bulker bulk.Bulk, opt ...Option) ([]model.Policy, error) {
o := newOption(FleetPolicies, opt...)
res, err := bulker.Search(ctx, o.indexName, tmplQueryLatestPolicies, bulk.WithIgnoreUnavailble())
if err != nil {
return nil, err
}
policyID, ok := res.Aggregations[FieldPolicyID]
if !ok {
// Aggregation will not be here if there index is not available
return []model.Policy{}, nil
}
if len(policyID.Buckets) == 0 {
return []model.Policy{}, nil
}
policies := make([]model.Policy, len(policyID.Buckets))
for i, bucket := range policyID.Buckets {
revisionIdx, ok := bucket.Aggregations[FieldRevisionIdx]
if !ok || len(revisionIdx.Hits) != 1 {
return nil, ErrMissingAggregations
}
hit := revisionIdx.Hits[0]
err = hit.Unmarshal(&policies[i])
if err != nil {
return nil, err
}
}
return policies, nil
}
// CreatePolicy creates a new policy in the index
func CreatePolicy(ctx context.Context, bulker bulk.Bulk, policy model.Policy, opt ...Option) (string, error) {
o := newOption(FleetPolicies, opt...)
data, err := json.Marshal(&policy)
if err != nil {
return "", err
}
return bulker.Create(ctx, o.indexName, "", data, bulk.WithRefresh())
}
func prepareQueryPolicies() *dsl.Tmpl {
tmpl := dsl.NewTmpl()
root := dsl.NewRoot()
root.Size(100)
root.Sort().SortOrder("@timestamp", "desc")
root.Source().Includes("data.outputs")
tmpl.MustResolve(root)
return tmpl
}
// query policies last updated, find the one with matching output
// can't filter on output in ES as the field is not mapped
func QueryOutputFromPolicy(ctx context.Context, bulker bulk.Bulk, outputName string, opt ...Option) (*model.Policy, error) {
o := newOption(FleetPolicies, opt...)
params := map[string]interface{}{}
res, err := Search(ctx, bulker, tmplQueryPolicies, o.indexName, params)
if err != nil {
if errors.Is(err, es.ErrIndexNotFound) {
zerolog.Ctx(ctx).Debug().Str("index", o.indexName).Msg(es.ErrIndexNotFound.Error())
err = nil
}
return nil, err
}
var policy model.Policy
for _, hit := range res.Hits {
err = hit.Unmarshal(&policy)
if err != nil {
return nil, err
}
if policy.Data.Outputs[outputName] != nil {
return &policy, nil
}
}
zerolog.Ctx(ctx).Debug().Str(logger.PolicyOutputName, outputName).Msg("policy with output not found")
return nil, nil
}