func()

in internal/pkg/monitor/monitor.go [206:375]


func (m *simpleMonitorT) Run(ctx context.Context) (err error) {
	m.log = zerolog.Ctx(ctx).With().Str("index", m.index).Str("ctx", "index monitor").Logger()
	m.log.Info().Msg("starting index monitor")
	defer func() {
		if errors.Is(err, context.Canceled) {
			err = nil
		}
		m.log.Info().Err(err).Msg("index monitor exited")
	}()

	defer func() {
		if m.readyCh != nil {
			m.readyCh <- err
		}
	}()

	// Get initial global checkpoint
	var trans *apm.Transaction
	for {
		if m.tracer != nil {
			trans = m.tracer.StartTransaction("Query fleet global checkpoint", "monitor")
			ctx = apm.ContextWithTransaction(ctx, trans)
		}
		span, sCtx := apm.StartSpan(ctx, "global_checkpoint", "fleet_global_checkpoints")
		checkpoint, err := gcheckpt.Query(sCtx, m.monCli, m.index)
		span.End()
		if err != nil {
			m.log.Warn().Err(err).Msg("failed to initialize the global checkpoints, will retry")
			err = sleep.WithContext(ctx, retryDelay)
			if err != nil {
				if m.tracer != nil {
					trans.End()
				}
				return err
			}
			if m.tracer != nil {
				trans.End()
			}
			continue
		}

		m.storeCheckpoint(checkpoint)
		m.log.Debug().Ints64("checkpoint", checkpoint).Msg("initial checkpoint")

		if m.tracer != nil {
			trans.End()
		}
		break
	}

	// Signal the monitor is ready
	if m.readyCh != nil {
		m.readyCh <- nil
		m.readyCh = nil
	}

	for {
		if m.tracer != nil {
			trans = m.tracer.StartTransaction(fmt.Sprintf("Monitor index %s", m.index), "monitor")
			ctx = apm.ContextWithTransaction(ctx, trans)
		}
		checkpoint := m.loadCheckpoint()

		// Wait for checkpoint advance, long poll.
		// It returns only if there are new documents fully indexed with _seq_no greater than the passed checkpoint value
		// or the timeout (long poll interval).
		span, gCtx := apm.StartSpan(ctx, "global_checkpoint", "wait_for_advance")
		newCheckpoint, err := gcheckpt.WaitAdvance(gCtx, m.monCli, m.index, checkpoint, m.pollTimeout)
		span.End()
		if err != nil {
			if errors.Is(err, es.ErrIndexNotFound) {
				// Wait until created
				m.log.Debug().Msgf("index not found, poll again in %v", retryDelay)
			} else if errors.Is(err, es.ErrTimeout) {
				// Timed out, wait again
				m.log.Debug().Msg("timeout on global checkpoints advance, poll again")
				// Loop back to the checkpoint "wait advance" without delay
				if m.tracer != nil {
					trans.End()
				}
				continue
			} else if errors.Is(err, context.Canceled) {
				m.log.Info().Msg("context closed waiting for global checkpoints advance")
				// Exit run
				if m.tracer != nil {
					trans.End()
				}
				return err
			} else {
				// Log the error and keep trying
				m.log.Info().Err(err).Msg("failed on waiting for global checkpoints advance")
			}

			// Delay next attempt
			err = sleep.WithContext(ctx, retryDelay)
			if err != nil {
				if m.tracer != nil {
					trans.End()
				}
				return err
			}
			// Loop back to the checkpoint "wait advance" after the retry delay
			if m.tracer != nil {
				trans.End()
			}
			continue
		}

		// This is an example of steps for fetching the documents without "holes" (not-yet-indexed documents in between)
		// as recommended by Elasticsearch team on August 25th, 2021
		// 1. Call Global checkpoints = 5
		// 2. Search = 1, 2, 3, 5.
		// 3. Manual refresh
		// 4. Search and get 4,5
		// 5. Return to step 1

		// Fetch up to the new checkpoint.
		//
		// The fetch happens at least once.
		// The fetch repeats until there is no more documents to fetch.

		// Set count to max fetch size (m.fetchSize) initially, so the fetch happens at least once.
		count := m.fetchSize
		for count == m.fetchSize {
			// Fetch the documents between the last known checkpoint and the new checkpoint value received from "wait advance".
			hits, err := m.fetch(ctx, checkpoint, newCheckpoint)
			if err != nil {
				m.log.Error().Err(err).Msg("failed checking new documents")
				if m.tracer != nil {
					trans.End()
				}
				break
			}

			// Notify call updates m.checkpoint as max(_seq_no) from the fetched hits
			count = m.notify(ctx, hits)
			m.log.Debug().Int("count", count).Msg("hits found after notify")

			// If the number of fetched documents is the same as the max fetch size, then it's possible there are more documents to fetch.
			if count == m.fetchSize {
				// Get the latest checkpoint value for the next fetch iteration.
				checkpoint = m.loadCheckpoint()
			} else {
				// If the fetched number of documents is less than the max fetched size, then it is a final fetch for new checkpoint.
				// Update the monitor checkpoint value from the checkpoint "wait advance" response.
				//
				// This avoids the situation where the actions monitor checkpoint gets out of sync with the index checkpoint,
				// due to the index checkpoint being incremented by elasticsearch upon deleting the document.
				//
				// This fixes the issue https://github.com/elastic/fleet-server/issues/2205.
				// The root cause of the issue was the monitor implementation was not correctly accounting for the index
				// checkpoint increment when the action document is deleted from the index.
				m.storeCheckpoint(newCheckpoint)
			}
		}
		if m.tracer != nil {
			trans.End()
		}
		if m.debounceTime > 0 {
			m.log.Debug().Dur("debounce_time", m.debounceTime).Msg("monitor debounce start")
			// Introduce a debounce time before wait advance (the signal for new docs in the index)
			// This is specifically done so we can introduce a delay in for cases like rapid policy changes
			// where fleet-server may not have finished dispatching policies to all agents when a new change is detected.
			err := sleep.WithContext(ctx, m.debounceTime)
			if err != nil {
				return err
			}
		}
	}
}