pkg/stanza/fileconsumer/file.go (219 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer"
import (
"context"
"fmt"
"os"
"sync"
"time"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/checkpoint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/tracker"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)
type Manager struct {
set component.TelemetrySettings
wg sync.WaitGroup
cancel context.CancelFunc
readerFactory reader.Factory
fileMatcher *matcher.Matcher
tracker tracker.Tracker
noTracking bool
pollInterval time.Duration
persister operator.Persister
maxBatches int
maxBatchFiles int
pollsToArchive int
telemetryBuilder *metadata.TelemetryBuilder
}
func (m *Manager) Start(persister operator.Persister) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel
if _, err := m.fileMatcher.MatchFiles(); err != nil {
m.set.Logger.Warn("finding files", zap.Error(err))
}
// instantiate the tracker
m.instantiateTracker(ctx, persister)
if persister != nil {
m.persister = persister
offsets, err := checkpoint.Load(ctx, m.persister)
if err != nil {
return fmt.Errorf("read known files from database: %w", err)
}
if len(offsets) > 0 {
m.set.Logger.Info("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.FromBeginning = true
m.tracker.LoadMetadata(offsets)
}
} else if m.pollsToArchive > 0 {
m.set.Logger.Error("archiving is not supported in memory, please use a storage extension")
}
// Start polling goroutine
m.startPoller(ctx)
return nil
}
// Stop will stop the file monitoring process
func (m *Manager) Stop() error {
if m.cancel != nil {
m.cancel()
m.cancel = nil
}
m.wg.Wait()
if m.tracker != nil {
m.telemetryBuilder.FileconsumerOpenFiles.Add(context.TODO(), int64(0-m.tracker.ClosePreviousFiles()))
}
if m.persister != nil {
if err := checkpoint.Save(context.Background(), m.persister, m.tracker.GetMetadata()); err != nil {
m.set.Logger.Error("save offsets", zap.Error(err))
}
}
return nil
}
// startPoller kicks off a goroutine that will poll the filesystem periodically,
// checking if there are new files or new logs in the watched files
func (m *Manager) startPoller(ctx context.Context) {
m.wg.Add(1)
go func() {
defer m.wg.Done()
globTicker := time.NewTicker(m.pollInterval)
defer globTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-globTicker.C:
}
m.poll(ctx)
}
}()
}
// poll checks all the watched paths for new entries
func (m *Manager) poll(ctx context.Context) {
// Used to keep track of the number of batches processed in this poll cycle
batchesProcessed := 0
// Get the list of paths on disk
matches, err := m.fileMatcher.MatchFiles()
if err != nil {
m.set.Logger.Debug("finding files", zap.Error(err))
}
m.set.Logger.Debug("matched files", zap.Strings("paths", matches))
for len(matches) > m.maxBatchFiles {
m.consume(ctx, matches[:m.maxBatchFiles])
// If a maxBatches is set, check if we have hit the limit
if m.maxBatches != 0 {
batchesProcessed++
if batchesProcessed >= m.maxBatches {
return
}
}
matches = matches[m.maxBatchFiles:]
}
m.consume(ctx, matches)
// Any new files that appear should be consumed entirely
m.readerFactory.FromBeginning = true
if m.persister != nil {
metadata := m.tracker.GetMetadata()
if metadata != nil {
if err := checkpoint.Save(context.Background(), m.persister, metadata); err != nil {
m.set.Logger.Error("save offsets", zap.Error(err))
}
}
}
// rotate at end of every poll()
m.tracker.EndPoll()
}
func (m *Manager) consume(ctx context.Context, paths []string) {
m.set.Logger.Debug("Consuming files", zap.Strings("paths", paths))
m.makeReaders(ctx, paths)
m.readLostFiles(ctx)
// read new readers to end
var wg sync.WaitGroup
for _, r := range m.tracker.CurrentPollFiles() {
wg.Add(1)
go func(r *reader.Reader) {
defer wg.Done()
m.telemetryBuilder.FileconsumerReadingFiles.Add(ctx, 1)
r.ReadToEnd(ctx)
m.telemetryBuilder.FileconsumerReadingFiles.Add(ctx, -1)
}(r)
}
wg.Wait()
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, int64(0-m.tracker.EndConsume()))
}
func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
file, err := os.Open(path) // #nosec - operator must read in files defined by user
if err != nil {
m.set.Logger.Error("Failed to open file", zap.Error(err))
return nil, nil
}
fp, err := m.readerFactory.NewFingerprint(file)
if err != nil {
if err = file.Close(); err != nil {
m.set.Logger.Debug("problem closing file", zap.Error(err))
}
return nil, nil
}
if fp.Len() == 0 {
// Empty file, don't read it until we can compare its fingerprint
if err = file.Close(); err != nil {
m.set.Logger.Debug("problem closing file", zap.Error(err))
}
return nil, nil
}
return fp, file
}
// makeReader take a file path, then creates reader,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReaders(ctx context.Context, paths []string) {
for _, path := range paths {
fp, file := m.makeFingerprint(path)
if fp == nil {
continue
}
// Exclude duplicate paths with the same content. This can happen when files are
// being rotated with copy/truncate strategy. (After copy, prior to truncate.)
if r := m.tracker.GetCurrentFile(fp); r != nil {
m.set.Logger.Debug("Skipping duplicate file", zap.String("path", file.Name()))
// re-add the reader as Match() removes duplicates
m.tracker.Add(r)
if err := file.Close(); err != nil {
m.set.Logger.Debug("problem closing file", zap.Error(err))
}
continue
}
r, err := m.newReader(ctx, file, fp)
if err != nil {
m.set.Logger.Error("Failed to create reader", zap.Error(err))
continue
}
m.tracker.Add(r)
}
}
func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check previous poll cycle for match
if oldReader := m.tracker.GetOpenFile(fp); oldReader != nil {
if oldReader.GetFileName() != file.Name() {
if !oldReader.Validate() {
m.set.Logger.Debug(
"File has been rotated(truncated)",
zap.String("original_path", oldReader.GetFileName()),
zap.String("rotated_path", file.Name()))
} else {
m.set.Logger.Debug(
"File has been rotated(moved)",
zap.String("original_path", oldReader.GetFileName()),
zap.String("rotated_path", file.Name()))
}
}
return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close())
}
// Check for closed files for match
if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil {
r, err := m.readerFactory.NewReaderFromMetadata(file, oldMetadata)
if err != nil {
return nil, err
}
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
return r, nil
}
// If we don't match any previously known files, create a new reader from scratch
m.set.Logger.Info("Started watching file", zap.String("path", file.Name()))
r, err := m.readerFactory.NewReader(file, fp)
if err != nil {
return nil, err
}
m.telemetryBuilder.FileconsumerOpenFiles.Add(ctx, 1)
return r, nil
}
func (m *Manager) instantiateTracker(ctx context.Context, persister operator.Persister) {
var t tracker.Tracker
if m.noTracking {
t = tracker.NewNoStateTracker(m.set, m.maxBatchFiles)
} else {
t = tracker.NewFileTracker(ctx, m.set, m.maxBatchFiles, m.pollsToArchive, persister)
}
m.tracker = t
}