internal/pkg/monitor/monitor.go (349 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. // Package monitor provides a way to track new/updated documents in an Elasticsearch index. package monitor import ( "bytes" "context" "encoding/json" "errors" "fmt" "sync" "time" "github.com/elastic/fleet-server/v7/internal/pkg/dl" "github.com/elastic/fleet-server/v7/internal/pkg/dsl" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/gcheckpt" "github.com/elastic/fleet-server/v7/internal/pkg/sleep" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" "github.com/rs/zerolog" "go.elastic.co/apm/v2" "github.com/elastic/go-elasticsearch/v8" ) const ( defaultPollTimeout = 4 * time.Minute // default long poll timeout defaultSeqNo = int64(-1) //nolint:deadcode,varcheck // the _seq_no in elasticsearch start with 0 defaultWithExpiration = false // Making the default fetch size larger, in order to increase the throughput of the monitor. // This is configurable as well, so can be adjusted based on the memory size of the container if needed. // Seems like the usage of smaller actions, one or few agents in the action document would be more prevalent in the future. // For example, as of now the current size of osquery action JSON document for 1000 agents is 40KB. // Assuiming the worst case scenario of 1000 of document fetched, we are looking at 50MB slice. // One action can be split up into multiple documents up to the 1000 agents per action if needed. defaultFetchSize = 1000 // Retry delay on error waiting on the global checkpoint update. // This is the wait time between requests to elastisearch in case if: // 1. Index is not found (index is created only on the first document save) // 2. Any other error waiting on global checkpoint, except timeouts. // For the long poll timeout, start a new request as soon as possible. retryDelay = 10 * time.Second ) const ( seqNoPrimaryTerm = "seq_no_primary_term" fieldSeqNo = "_seq_no" fieldMaxSeqNo = "max_seq_no" fieldExpiration = "expiration" ) // GlobalCheckpointProvider provides SeqNo. type GlobalCheckpointProvider interface { GetCheckpoint() sqn.SeqNo } // BaseMonitor is the monitor's interface implemented by SimpleMonitor and Monitor type BaseMonitor interface { GlobalCheckpointProvider // Run runs the monitor Run(ctx context.Context) error } // SimpleMonitor monitors for new documents in an index. type SimpleMonitor interface { BaseMonitor // Output is the channel the monitor send new documents to Output() <-chan []es.HitT } // simpleMonitorT monitors for new documents in an index type simpleMonitorT struct { esCli *elasticsearch.Client monCli *elasticsearch.Client tracer *apm.Tracer tmplCheck *dsl.Tmpl tmplQuery *dsl.Tmpl index string pollTimeout time.Duration withExpiration bool fetchSize int debounceTime time.Duration checkpoint sqn.SeqNo // index global checkpoint mx sync.RWMutex // checkpoint mutex log zerolog.Logger outCh chan []es.HitT readyCh chan error } // Option is a functional configuration option. type Option func(SimpleMonitor) // NewSimple creates new SimpleMonitor. func NewSimple(index string, esCli, monCli *elasticsearch.Client, opts ...Option) (SimpleMonitor, error) { m := &simpleMonitorT{ index: index, esCli: esCli, monCli: monCli, tracer: nil, pollTimeout: defaultPollTimeout, withExpiration: defaultWithExpiration, fetchSize: defaultFetchSize, debounceTime: 0, checkpoint: sqn.DefaultSeqNo, outCh: make(chan []es.HitT, 1), } for _, opt := range opts { opt(m) } tmplCheck, err := m.prepareCheckQuery() if err != nil { return nil, err } m.tmplCheck = tmplCheck tmplQuery, err := m.prepareQuery() if err != nil { return nil, err } m.tmplQuery = tmplQuery return m, nil } // WithFetchSize sets the fetch size of the monitor. func WithFetchSize(fetchSize int) Option { return func(m SimpleMonitor) { if fetchSize > 0 { m.(*simpleMonitorT).fetchSize = fetchSize } } } // WithPollTimeout sets the global checkpoint polling timeout func WithPollTimeout(to time.Duration) Option { return func(m SimpleMonitor) { m.(*simpleMonitorT).pollTimeout = to } } // WithExpiration adds the expiration field to the monitor query. func WithExpiration(withExpiration bool) Option { return func(m SimpleMonitor) { m.(*simpleMonitorT).withExpiration = withExpiration } } // WithReadyChan allows to pass the channel that will signal when monitor is ready. func WithReadyChan(readyCh chan error) Option { return func(m SimpleMonitor) { m.(*simpleMonitorT).readyCh = readyCh } } func WithAPMTracer(tracer *apm.Tracer) Option { return func(m SimpleMonitor) { m.(*simpleMonitorT).tracer = tracer } } func WithDebounceTime(dur time.Duration) Option { return func(m SimpleMonitor) { m.(*simpleMonitorT).debounceTime = dur } } // Output returns the output channel for the monitor. func (m *simpleMonitorT) Output() <-chan []es.HitT { return m.outCh } // GetCheckpoint implements GlobalCheckpointProvider interface. func (m *simpleMonitorT) GetCheckpoint() sqn.SeqNo { return m.loadCheckpoint() } func (m *simpleMonitorT) storeCheckpoint(val sqn.SeqNo) { m.log.Debug().Ints64("checkpoints", val).Msg("updated checkpoint") m.mx.Lock() defer m.mx.Unlock() m.checkpoint = val.Clone() } func (m *simpleMonitorT) loadCheckpoint() sqn.SeqNo { m.mx.RLock() defer m.mx.RUnlock() return m.checkpoint.Clone() } // Run runs monitor. func (m *simpleMonitorT) Run(ctx context.Context) (err error) { m.log = zerolog.Ctx(ctx).With().Str("index", m.index).Str("ctx", "index monitor").Logger() m.log.Info().Msg("starting index monitor") defer func() { if errors.Is(err, context.Canceled) { err = nil } m.log.Info().Err(err).Msg("index monitor exited") }() defer func() { if m.readyCh != nil { m.readyCh <- err } }() // Get initial global checkpoint var trans *apm.Transaction for { if m.tracer != nil { trans = m.tracer.StartTransaction("Query fleet global checkpoint", "monitor") ctx = apm.ContextWithTransaction(ctx, trans) } span, sCtx := apm.StartSpan(ctx, "global_checkpoint", "fleet_global_checkpoints") checkpoint, err := gcheckpt.Query(sCtx, m.monCli, m.index) span.End() if err != nil { m.log.Warn().Err(err).Msg("failed to initialize the global checkpoints, will retry") err = sleep.WithContext(ctx, retryDelay) if err != nil { if m.tracer != nil { trans.End() } return err } if m.tracer != nil { trans.End() } continue } m.storeCheckpoint(checkpoint) m.log.Debug().Ints64("checkpoint", checkpoint).Msg("initial checkpoint") if m.tracer != nil { trans.End() } break } // Signal the monitor is ready if m.readyCh != nil { m.readyCh <- nil m.readyCh = nil } for { if m.tracer != nil { trans = m.tracer.StartTransaction(fmt.Sprintf("Monitor index %s", m.index), "monitor") ctx = apm.ContextWithTransaction(ctx, trans) } checkpoint := m.loadCheckpoint() // Wait for checkpoint advance, long poll. // It returns only if there are new documents fully indexed with _seq_no greater than the passed checkpoint value // or the timeout (long poll interval). span, gCtx := apm.StartSpan(ctx, "global_checkpoint", "wait_for_advance") newCheckpoint, err := gcheckpt.WaitAdvance(gCtx, m.monCli, m.index, checkpoint, m.pollTimeout) span.End() if err != nil { if errors.Is(err, es.ErrIndexNotFound) { // Wait until created m.log.Debug().Msgf("index not found, poll again in %v", retryDelay) } else if errors.Is(err, es.ErrTimeout) { // Timed out, wait again m.log.Debug().Msg("timeout on global checkpoints advance, poll again") // Loop back to the checkpoint "wait advance" without delay if m.tracer != nil { trans.End() } continue } else if errors.Is(err, context.Canceled) { m.log.Info().Msg("context closed waiting for global checkpoints advance") // Exit run if m.tracer != nil { trans.End() } return err } else { // Log the error and keep trying m.log.Info().Err(err).Msg("failed on waiting for global checkpoints advance") } // Delay next attempt err = sleep.WithContext(ctx, retryDelay) if err != nil { if m.tracer != nil { trans.End() } return err } // Loop back to the checkpoint "wait advance" after the retry delay if m.tracer != nil { trans.End() } continue } // This is an example of steps for fetching the documents without "holes" (not-yet-indexed documents in between) // as recommended by Elasticsearch team on August 25th, 2021 // 1. Call Global checkpoints = 5 // 2. Search = 1, 2, 3, 5. // 3. Manual refresh // 4. Search and get 4,5 // 5. Return to step 1 // Fetch up to the new checkpoint. // // The fetch happens at least once. // The fetch repeats until there is no more documents to fetch. // Set count to max fetch size (m.fetchSize) initially, so the fetch happens at least once. count := m.fetchSize for count == m.fetchSize { // Fetch the documents between the last known checkpoint and the new checkpoint value received from "wait advance". hits, err := m.fetch(ctx, checkpoint, newCheckpoint) if err != nil { m.log.Error().Err(err).Msg("failed checking new documents") if m.tracer != nil { trans.End() } break } // Notify call updates m.checkpoint as max(_seq_no) from the fetched hits count = m.notify(ctx, hits) m.log.Debug().Int("count", count).Msg("hits found after notify") // If the number of fetched documents is the same as the max fetch size, then it's possible there are more documents to fetch. if count == m.fetchSize { // Get the latest checkpoint value for the next fetch iteration. checkpoint = m.loadCheckpoint() } else { // If the fetched number of documents is less than the max fetched size, then it is a final fetch for new checkpoint. // Update the monitor checkpoint value from the checkpoint "wait advance" response. // // This avoids the situation where the actions monitor checkpoint gets out of sync with the index checkpoint, // due to the index checkpoint being incremented by elasticsearch upon deleting the document. // // This fixes the issue https://github.com/elastic/fleet-server/issues/2205. // The root cause of the issue was the monitor implementation was not correctly accounting for the index // checkpoint increment when the action document is deleted from the index. m.storeCheckpoint(newCheckpoint) } } if m.tracer != nil { trans.End() } if m.debounceTime > 0 { m.log.Debug().Dur("debounce_time", m.debounceTime).Msg("monitor debounce start") // Introduce a debounce time before wait advance (the signal for new docs in the index) // This is specifically done so we can introduce a delay in for cases like rapid policy changes // where fleet-server may not have finished dispatching policies to all agents when a new change is detected. err := sleep.WithContext(ctx, m.debounceTime) if err != nil { return err } } } } func (m *simpleMonitorT) notify(ctx context.Context, hits []es.HitT) int { sz := len(hits) if sz > 0 { select { case m.outCh <- hits: maxVal := hits[sz-1].SeqNo m.storeCheckpoint([]int64{maxVal}) return sz case <-ctx.Done(): } } return 0 } func (m *simpleMonitorT) fetch(ctx context.Context, checkpoint, maxCheckpoint sqn.SeqNo) ([]es.HitT, error) { now := time.Now().UTC().Format(time.RFC3339) // Run check query that detects that there are new documents available params := map[string]interface{}{ dl.FieldSeqNo: checkpoint.Value(), dl.FieldMaxSeqNo: maxCheckpoint.Value(), } if m.withExpiration { params[dl.FieldExpiration] = now } hits, err := m.search(ctx, m.tmplQuery, params, maxCheckpoint) if err != nil { return nil, err } return hits, nil } func (m *simpleMonitorT) search(ctx context.Context, tmpl *dsl.Tmpl, params map[string]interface{}, seqNos sqn.SeqNo) ([]es.HitT, error) { span, ctx := apm.StartSpan(ctx, "action", "fleet_search") defer span.End() query, err := tmpl.Render(params) if err != nil { return nil, err } res, err := m.esCli.FleetSearch( m.index, m.esCli.FleetSearch.WithContext(ctx), m.esCli.FleetSearch.WithBody(bytes.NewBuffer(query)), m.esCli.FleetSearch.WithWaitForCheckpoints(seqNos.String()), ) if err != nil { return nil, err } defer res.Body.Close() var esres es.Response err = json.NewDecoder(res.Body).Decode(&esres) if err != nil { return nil, err } if res.IsError() { err = es.TranslateError(res.StatusCode, esres.Error) } if err != nil { if errors.Is(err, es.ErrIndexNotFound) { m.log.Debug().Msg(es.ErrIndexNotFound.Error()) return nil, nil } return nil, err } return esres.Hits.Hits, nil } // Prepares minimal query to do the quick check without reading all matches full documents func (m *simpleMonitorT) prepareCheckQuery() (*dsl.Tmpl, error) { tmpl, root := m.prepareCommon(false) root.Source().Includes(dl.FieldSeqNo) root.Size(1) if err := tmpl.Resolve(root); err != nil { return nil, err } return tmpl, nil } // Prepares full documents query func (m *simpleMonitorT) prepareQuery() (*dsl.Tmpl, error) { tmpl, root := m.prepareCommon(true) root.Size(uint64(m.fetchSize)) //nolint:gosec // disable G115 root.Sort().SortOrder(fieldSeqNo, dsl.SortAscend) if err := tmpl.Resolve(root); err != nil { return nil, err } return tmpl, nil } func (m *simpleMonitorT) prepareCommon(limitMax bool) (*dsl.Tmpl, *dsl.Node) { tmpl := dsl.NewTmpl() root := dsl.NewRoot() root.Param(seqNoPrimaryTerm, true) filter := root.Query().Bool().Filter() filter.Range(fieldSeqNo, dsl.WithRangeGT(tmpl.Bind(fieldSeqNo))) if limitMax { filter.Range(fieldSeqNo, dsl.WithRangeLTE(tmpl.Bind(fieldMaxSeqNo))) } if m.withExpiration { filter.Range(fieldExpiration, dsl.WithRangeGT(tmpl.Bind(fieldExpiration))) } return tmpl, root }