in internal/pkg/monitor/monitor.go [206:375]
func (m *simpleMonitorT) Run(ctx context.Context) (err error) {
m.log = zerolog.Ctx(ctx).With().Str("index", m.index).Str("ctx", "index monitor").Logger()
m.log.Info().Msg("starting index monitor")
defer func() {
if errors.Is(err, context.Canceled) {
err = nil
}
m.log.Info().Err(err).Msg("index monitor exited")
}()
defer func() {
if m.readyCh != nil {
m.readyCh <- err
}
}()
// Get initial global checkpoint
var trans *apm.Transaction
for {
if m.tracer != nil {
trans = m.tracer.StartTransaction("Query fleet global checkpoint", "monitor")
ctx = apm.ContextWithTransaction(ctx, trans)
}
span, sCtx := apm.StartSpan(ctx, "global_checkpoint", "fleet_global_checkpoints")
checkpoint, err := gcheckpt.Query(sCtx, m.monCli, m.index)
span.End()
if err != nil {
m.log.Warn().Err(err).Msg("failed to initialize the global checkpoints, will retry")
err = sleep.WithContext(ctx, retryDelay)
if err != nil {
if m.tracer != nil {
trans.End()
}
return err
}
if m.tracer != nil {
trans.End()
}
continue
}
m.storeCheckpoint(checkpoint)
m.log.Debug().Ints64("checkpoint", checkpoint).Msg("initial checkpoint")
if m.tracer != nil {
trans.End()
}
break
}
// Signal the monitor is ready
if m.readyCh != nil {
m.readyCh <- nil
m.readyCh = nil
}
for {
if m.tracer != nil {
trans = m.tracer.StartTransaction(fmt.Sprintf("Monitor index %s", m.index), "monitor")
ctx = apm.ContextWithTransaction(ctx, trans)
}
checkpoint := m.loadCheckpoint()
// Wait for checkpoint advance, long poll.
// It returns only if there are new documents fully indexed with _seq_no greater than the passed checkpoint value
// or the timeout (long poll interval).
span, gCtx := apm.StartSpan(ctx, "global_checkpoint", "wait_for_advance")
newCheckpoint, err := gcheckpt.WaitAdvance(gCtx, m.monCli, m.index, checkpoint, m.pollTimeout)
span.End()
if err != nil {
if errors.Is(err, es.ErrIndexNotFound) {
// Wait until created
m.log.Debug().Msgf("index not found, poll again in %v", retryDelay)
} else if errors.Is(err, es.ErrTimeout) {
// Timed out, wait again
m.log.Debug().Msg("timeout on global checkpoints advance, poll again")
// Loop back to the checkpoint "wait advance" without delay
if m.tracer != nil {
trans.End()
}
continue
} else if errors.Is(err, context.Canceled) {
m.log.Info().Msg("context closed waiting for global checkpoints advance")
// Exit run
if m.tracer != nil {
trans.End()
}
return err
} else {
// Log the error and keep trying
m.log.Info().Err(err).Msg("failed on waiting for global checkpoints advance")
}
// Delay next attempt
err = sleep.WithContext(ctx, retryDelay)
if err != nil {
if m.tracer != nil {
trans.End()
}
return err
}
// Loop back to the checkpoint "wait advance" after the retry delay
if m.tracer != nil {
trans.End()
}
continue
}
// This is an example of steps for fetching the documents without "holes" (not-yet-indexed documents in between)
// as recommended by Elasticsearch team on August 25th, 2021
// 1. Call Global checkpoints = 5
// 2. Search = 1, 2, 3, 5.
// 3. Manual refresh
// 4. Search and get 4,5
// 5. Return to step 1
// Fetch up to the new checkpoint.
//
// The fetch happens at least once.
// The fetch repeats until there is no more documents to fetch.
// Set count to max fetch size (m.fetchSize) initially, so the fetch happens at least once.
count := m.fetchSize
for count == m.fetchSize {
// Fetch the documents between the last known checkpoint and the new checkpoint value received from "wait advance".
hits, err := m.fetch(ctx, checkpoint, newCheckpoint)
if err != nil {
m.log.Error().Err(err).Msg("failed checking new documents")
if m.tracer != nil {
trans.End()
}
break
}
// Notify call updates m.checkpoint as max(_seq_no) from the fetched hits
count = m.notify(ctx, hits)
m.log.Debug().Int("count", count).Msg("hits found after notify")
// If the number of fetched documents is the same as the max fetch size, then it's possible there are more documents to fetch.
if count == m.fetchSize {
// Get the latest checkpoint value for the next fetch iteration.
checkpoint = m.loadCheckpoint()
} else {
// If the fetched number of documents is less than the max fetched size, then it is a final fetch for new checkpoint.
// Update the monitor checkpoint value from the checkpoint "wait advance" response.
//
// This avoids the situation where the actions monitor checkpoint gets out of sync with the index checkpoint,
// due to the index checkpoint being incremented by elasticsearch upon deleting the document.
//
// This fixes the issue https://github.com/elastic/fleet-server/issues/2205.
// The root cause of the issue was the monitor implementation was not correctly accounting for the index
// checkpoint increment when the action document is deleted from the index.
m.storeCheckpoint(newCheckpoint)
}
}
if m.tracer != nil {
trans.End()
}
if m.debounceTime > 0 {
m.log.Debug().Dur("debounce_time", m.debounceTime).Msg("monitor debounce start")
// Introduce a debounce time before wait advance (the signal for new docs in the index)
// This is specifically done so we can introduce a delay in for cases like rapid policy changes
// where fleet-server may not have finished dispatching policies to all agents when a new change is detected.
err := sleep.WithContext(ctx, m.debounceTime)
if err != nil {
return err
}
}
}
}