pkg/utils/fs/watcher.go (72 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package fs import ( "context" "os" "sync" "time" logf "sigs.k8s.io/controller-runtime/pkg/log" ) var log = logf.Log.WithName("fs-watcher") // FileWatcher watches a given set of file paths, not directories, for changes based on the file's mtime. type FileWatcher struct { ctx context.Context onChange func([]string) interval time.Duration cache fileModTimeCache once sync.Once } // NewFileWatcher creates a new file watcher, use ctx context for cancellation, paths to specify the files to watch. // onChange is a callback to be invoked when changes are detected, a list of affected files will be passed as argument. // interval determines how often the file watcher will try to detect changes to the files of interest. func NewFileWatcher(ctx context.Context, paths []string, onChange func([]string), interval time.Duration) *FileWatcher { return &FileWatcher{ ctx: ctx, onChange: onChange, interval: interval, cache: newFileModTimeCache(paths), } } // Run starts the file watcher. Should be typically run inside a go routine. func (fw *FileWatcher) Run() { fw.once.Do(func() { ticker := time.NewTicker(fw.interval) defer ticker.Stop() for { select { case <-fw.ctx.Done(): return case <-ticker.C: updated := fw.cache.Update() if len(updated) > 0 { fw.onChange(updated) } } } }) } type fileModTimeCache map[string]time.Time func newFileModTimeCache(paths []string) fileModTimeCache { cache := fileModTimeCache(map[string]time.Time{}) for _, f := range paths { cache[f] = time.Time{} } _ = cache.Update() return cache } func (fmc fileModTimeCache) Update() []string { var updated []string for f, prev := range fmc { stat, err := os.Stat(f) if err != nil { switch { case os.IsNotExist(err) && !prev.IsZero(): // file was deleted updated = append(updated, f) fmc[f] = time.Time{} case os.IsNotExist(err): // file does not exist can be ignored default: log.Error(err, "while getting file info", "file", f, "err", err.Error()) } continue } if prev != stat.ModTime() { updated = append(updated, f) fmc[f] = stat.ModTime() } } return updated }