pkg/stanza/fileconsumer/internal/tracker/tracker.go (326 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package tracker // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker" import ( "bytes" "context" "encoding/json" "fmt" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/xextension/storage" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fileset" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) const ( archiveIndexKey = "knownFilesArchiveIndex" archivePollsToArchiveKey = "knonwFilesPollsToArchive" ) // Interface for tracking files that are being consumed. type Tracker interface { Add(reader *reader.Reader) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata GetMetadata() []*reader.Metadata LoadMetadata(metadata []*reader.Metadata) CurrentPollFiles() []*reader.Reader PreviousPollFiles() []*reader.Reader ClosePreviousFiles() int EndPoll() EndConsume() int TotalReaders() int FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata } // fileTracker tracks known offsets for files that are being consumed by the manager. type fileTracker struct { set component.TelemetrySettings maxBatchFiles int currentPollFiles *fileset.Fileset[*reader.Reader] previousPollFiles *fileset.Fileset[*reader.Reader] knownFiles []*fileset.Fileset[*reader.Metadata] // persister is to be used to store offsets older than 3 poll cycles. // These offsets will be stored on disk persister operator.Persister pollsToArchive int archiveIndex int } func NewFileTracker(ctx context.Context, set component.TelemetrySettings, maxBatchFiles int, pollsToArchive int, persister operator.Persister) Tracker { knownFiles := make([]*fileset.Fileset[*reader.Metadata], 3) for i := 0; i < len(knownFiles); i++ { knownFiles[i] = fileset.New[*reader.Metadata](maxBatchFiles) } set.Logger = set.Logger.With(zap.String("tracker", "fileTracker")) t := &fileTracker{ set: set, maxBatchFiles: maxBatchFiles, currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles), previousPollFiles: fileset.New[*reader.Reader](maxBatchFiles), knownFiles: knownFiles, pollsToArchive: pollsToArchive, persister: persister, archiveIndex: 0, } if t.archiveEnabled() { t.restoreArchiveIndex(ctx) } return t } func (t *fileTracker) Add(reader *reader.Reader) { // add a new reader for tracking t.currentPollFiles.Add(reader) } func (t *fileTracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader { return t.currentPollFiles.Match(fp, fileset.Equal) } func (t *fileTracker) GetOpenFile(fp *fingerprint.Fingerprint) *reader.Reader { return t.previousPollFiles.Match(fp, fileset.StartsWith) } func (t *fileTracker) GetClosedFile(fp *fingerprint.Fingerprint) *reader.Metadata { for i := 0; i < len(t.knownFiles); i++ { if oldMetadata := t.knownFiles[i].Match(fp, fileset.StartsWith); oldMetadata != nil { return oldMetadata } } return nil } func (t *fileTracker) GetMetadata() []*reader.Metadata { // return all known metadata for checkpoining allCheckpoints := make([]*reader.Metadata, 0, t.TotalReaders()) for _, knownFiles := range t.knownFiles { allCheckpoints = append(allCheckpoints, knownFiles.Get()...) } for _, r := range t.previousPollFiles.Get() { allCheckpoints = append(allCheckpoints, r.Metadata) } return allCheckpoints } func (t *fileTracker) LoadMetadata(metadata []*reader.Metadata) { t.knownFiles[0].Add(metadata...) } func (t *fileTracker) CurrentPollFiles() []*reader.Reader { return t.currentPollFiles.Get() } func (t *fileTracker) PreviousPollFiles() []*reader.Reader { return t.previousPollFiles.Get() } func (t *fileTracker) ClosePreviousFiles() (filesClosed int) { // t.previousPollFiles -> t.knownFiles[0] for r, _ := t.previousPollFiles.Pop(); r != nil; r, _ = t.previousPollFiles.Pop() { t.knownFiles[0].Add(r.Close()) filesClosed++ } return } func (t *fileTracker) EndPoll() { // shift the filesets at end of every poll() call // t.knownFiles[0] -> t.knownFiles[1] -> t.knownFiles[2] // Instead of throwing it away, archive it. if t.archiveEnabled() { t.archive(t.knownFiles[2]) } copy(t.knownFiles[1:], t.knownFiles) t.knownFiles[0] = fileset.New[*reader.Metadata](t.maxBatchFiles) } func (t *fileTracker) TotalReaders() int { total := t.previousPollFiles.Len() for i := 0; i < len(t.knownFiles); i++ { total += t.knownFiles[i].Len() } return total } func (t *fileTracker) restoreArchiveIndex(ctx context.Context) { // remove extra "keys" once archive restoration is done defer t.removeExtraKeys(ctx) defer func() { // store current pollsToArchive if err := t.persister.Set(ctx, archivePollsToArchiveKey, encodeIndex(t.pollsToArchive)); err != nil { t.set.Logger.Error("Error storing polls_to_archive", zap.Error(err)) } }() previousPollsToArchive, err := t.getPreviousPollsToArchive(ctx) if err != nil { // if there's an error reading previousPollsToArchive, default to current value previousPollsToArchive = t.pollsToArchive } t.archiveIndex, err = t.getArchiveIndex(ctx) if err != nil { t.set.Logger.Error("error while reading the archiveIndexKey. Starting from 0", zap.Error(err)) return } if previousPollsToArchive < t.pollsToArchive { // if archive size has increased, we just increment the index until we enconter a nil value for t.archiveIndex < t.pollsToArchive && t.isSet(ctx, t.archiveIndex) { t.archiveIndex++ } } else if previousPollsToArchive > t.pollsToArchive { // we will only attempt to rewrite archive if the archive size has shrunk t.set.Logger.Warn("polls_to_archive has changed. Will attempt to rewrite archive") t.rewriteArchive(ctx, previousPollsToArchive) } } func (t *fileTracker) rewriteArchive(ctx context.Context, previousPollsToArchive int) { // helper to rewrite data from oldIndex to newIndex rewrite := func(newIdx, oldIdex int) error { oldVal, err := t.persister.Get(ctx, archiveKey(oldIdex)) if err != nil { return err } return t.persister.Set(ctx, archiveKey(newIdx), oldVal) } // Calculate the least recent index, w.r.t. new archive size leastRecentIndex := mod(t.archiveIndex-t.pollsToArchive, previousPollsToArchive) // Refer archive.md for the detailed design if mod(t.archiveIndex-1, previousPollsToArchive) > t.pollsToArchive { for i := 0; i < t.pollsToArchive; i++ { if err := rewrite(i, leastRecentIndex); err != nil { t.set.Logger.Error("error while swapping archive", zap.Error(err)) } leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive } t.archiveIndex = 0 } else { if !t.isSet(ctx, t.archiveIndex) { // If the current index points at an unset key, no need to do anything return } for i := 0; i < t.pollsToArchive-t.archiveIndex; i++ { if err := rewrite(t.archiveIndex+i, leastRecentIndex); err != nil { t.set.Logger.Warn("error while swapping archive", zap.Error(err)) } leastRecentIndex = (leastRecentIndex + 1) % previousPollsToArchive } } } func (t *fileTracker) removeExtraKeys(ctx context.Context) { for i := t.pollsToArchive; t.isSet(ctx, i); i++ { if err := t.persister.Delete(ctx, archiveKey(i)); err != nil { t.set.Logger.Error("error while cleaning extra keys", zap.Error(err)) } } } func (t *fileTracker) getPreviousPollsToArchive(ctx context.Context) (int, error) { byteIndex, err := t.persister.Get(ctx, archivePollsToArchiveKey) if err != nil { t.set.Logger.Error("error while reading the archiveIndexKey", zap.Error(err)) return 0, err } previousPollsToArchive, err := decodeIndex(byteIndex) if err != nil { t.set.Logger.Error("error while decoding previousPollsToArchive", zap.Error(err)) return 0, err } return previousPollsToArchive, nil } func (t *fileTracker) getArchiveIndex(ctx context.Context) (int, error) { byteIndex, err := t.persister.Get(ctx, archiveIndexKey) if err != nil { return 0, err } archiveIndex, err := decodeIndex(byteIndex) if err != nil { return 0, err } return archiveIndex, nil } func (t *fileTracker) archive(metadata *fileset.Fileset[*reader.Metadata]) { // We make use of a ring buffer, where each set of files is stored under a specific index. // Instead of discarding knownFiles[2], write it to the next index and eventually roll over. // Separate storage keys knownFilesArchive0, knownFilesArchive1, ..., knownFilesArchiveN, roll over back to knownFilesArchive0 // Archiving: ┌─────────────────────on-disk archive─────────────────────────┐ // | ┌───┐ ┌───┐ ┌──────────────────┐ | // index | ▶ │ 0 │ ▶ │ 1 │ ▶ ... ▶ │ polls_to_archive │ | // | ▲ └───┘ └───┘ └──────────────────┘ | // | ▲ ▲ ▼ | // | ▲ │ Roll over overriting older offsets, if any ◀ | // └──────│──────────────────────────────────────────────────────┘ // │ // │ // │ // start // index index := t.archiveIndex t.archiveIndex = (t.archiveIndex + 1) % t.pollsToArchive // increment the index indexOp := storage.SetOperation(archiveIndexKey, encodeIndex(t.archiveIndex)) // batch the updated index with metadata if err := t.writeArchive(index, metadata, indexOp); err != nil { t.set.Logger.Error("error faced while saving to the archive", zap.Error(err)) } } // readArchive loads data from the archive for a given index and returns a fileset.Filset. func (t *fileTracker) readArchive(index int) (*fileset.Fileset[*reader.Metadata], error) { metadata, err := checkpoint.LoadKey(context.Background(), t.persister, archiveKey(index)) if err != nil { return nil, err } f := fileset.New[*reader.Metadata](len(metadata)) f.Add(metadata...) return f, nil } // writeArchive saves data to the archive for a given index and returns an error, if encountered. func (t *fileTracker) writeArchive(index int, rmds *fileset.Fileset[*reader.Metadata], ops ...*storage.Operation) error { return checkpoint.SaveKey(context.Background(), t.persister, rmds.Get(), archiveKey(index), ops...) } func (t *fileTracker) archiveEnabled() bool { return t.pollsToArchive > 0 && t.persister != nil } func (t *fileTracker) isSet(ctx context.Context, index int) bool { val, err := t.persister.Get(ctx, archiveKey(index)) return val != nil && err == nil } // FindFiles goes through archive, one fileset at a time and tries to match all fingerprints against that loaded set. func (t *fileTracker) FindFiles(fps []*fingerprint.Fingerprint) []*reader.Metadata { // To minimize disk access, we first access the index, then review unmatched files and update the metadata, if found. // We exit if all fingerprints are matched. // Track number of matched fingerprints so we can exit if all matched. var numMatched int // Determine the index for reading archive, starting from the most recent and moving towards the oldest nextIndex := t.archiveIndex matchedMetadata := make([]*reader.Metadata, len(fps)) // continue executing the loop until either all records are matched or all archive sets have been processed. for i := 0; i < t.pollsToArchive; i++ { // Update the mostRecentIndex nextIndex = (nextIndex - 1 + t.pollsToArchive) % t.pollsToArchive data, err := t.readArchive(nextIndex) // we load one fileset atmost once per poll if err != nil { t.set.Logger.Error("error while opening archive", zap.Error(err)) continue } archiveModified := false for j, fp := range fps { if matchedMetadata[j] != nil { // we've already found a match for this index, continue continue } if md := data.Match(fp, fileset.StartsWith); md != nil { // update the matched metada for the index matchedMetadata[j] = md archiveModified = true numMatched++ } } if !archiveModified { continue } // we save one fileset atmost once per poll if err := t.writeArchive(nextIndex, data); err != nil { t.set.Logger.Error("error while opening archive", zap.Error(err)) } // Check if all metadata have been found if numMatched == len(fps) { return matchedMetadata } } return matchedMetadata } // noStateTracker only tracks the current polled files. Once the poll is // complete and telemetry is consumed, the tracked files are closed. The next // poll will create fresh readers with no previously tracked offsets. type noStateTracker struct { set component.TelemetrySettings maxBatchFiles int currentPollFiles *fileset.Fileset[*reader.Reader] } func NewNoStateTracker(set component.TelemetrySettings, maxBatchFiles int) Tracker { set.Logger = set.Logger.With(zap.String("tracker", "noStateTracker")) return &noStateTracker{ set: set, maxBatchFiles: maxBatchFiles, currentPollFiles: fileset.New[*reader.Reader](maxBatchFiles), } } func (t *noStateTracker) Add(reader *reader.Reader) { // add a new reader for tracking t.currentPollFiles.Add(reader) } func (t *noStateTracker) CurrentPollFiles() []*reader.Reader { return t.currentPollFiles.Get() } func (t *noStateTracker) GetCurrentFile(fp *fingerprint.Fingerprint) *reader.Reader { return t.currentPollFiles.Match(fp, fileset.Equal) } func (t *noStateTracker) EndConsume() (filesClosed int) { for r, _ := t.currentPollFiles.Pop(); r != nil; r, _ = t.currentPollFiles.Pop() { r.Close() filesClosed++ } return } func (t *noStateTracker) GetOpenFile(_ *fingerprint.Fingerprint) *reader.Reader { return nil } func (t *noStateTracker) GetClosedFile(_ *fingerprint.Fingerprint) *reader.Metadata { return nil } func (t *noStateTracker) GetMetadata() []*reader.Metadata { return nil } func (t *noStateTracker) LoadMetadata(_ []*reader.Metadata) {} func (t *noStateTracker) PreviousPollFiles() []*reader.Reader { return nil } func (t *noStateTracker) ClosePreviousFiles() int { return 0 } func (t *noStateTracker) EndPoll() {} func (t *noStateTracker) TotalReaders() int { return 0 } func (t *noStateTracker) FindFiles([]*fingerprint.Fingerprint) []*reader.Metadata { return nil } func encodeIndex(val int) []byte { var buf bytes.Buffer enc := json.NewEncoder(&buf) // Encode the index if err := enc.Encode(val); err != nil { return nil } return buf.Bytes() } func decodeIndex(buf []byte) (int, error) { var index int // Decode the index dec := json.NewDecoder(bytes.NewReader(buf)) err := dec.Decode(&index) return max(index, 0), err } func archiveKey(i int) string { return fmt.Sprintf("knownFiles%d", i) } func mod(x, y int) int { return (x + y) % y }