in internal/pkg/composable/providers/filesource/filesource.go [59:175]
func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderComm) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("failed to create watcher: %w", err)
}
defer watcher.Close()
// invert the mapping to map paths to source names
inverted := make(map[string][]string, len(c.cfg.Sources))
for sourceName, sourceCfg := range c.cfg.Sources {
sources, ok := inverted[sourceCfg.Path]
if !ok {
sources = []string{sourceName}
} else {
sources = append(sources, sourceName)
}
inverted[sourceCfg.Path] = sources
}
// determine the paths to watch (watch is performed on the directories that contain the file)
//
// you cannot register the same directory multiple times so this ensures its only registered once
paths := make([]string, 0, len(c.cfg.Sources))
for _, cfg := range c.cfg.Sources {
parent := filepath.Dir(cfg.Path)
if !slices.Contains(paths, parent) {
paths = append(paths, parent)
}
}
for _, path := range paths {
err = watcher.Add(path)
if err != nil {
return fmt.Errorf("failed to watch path %q: %w", path, err)
}
}
// read the initial values after the watch has started
// this ensures that if the value changed between this code and the loop below
// the updated file changes will not be missed
current := make(map[string]interface{}, len(c.cfg.Sources))
readAll := func() error {
for path, sources := range inverted {
value := c.readContents(path)
for _, source := range sources {
current[source] = value
}
}
err = comm.Set(current)
if err != nil {
return fmt.Errorf("failed to set current context: %w", err)
}
return nil
}
err = readAll()
if err != nil {
// context for the error already added
return err
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case err, ok := <-watcher.Errors:
if ok {
c.logger.Errorf("file watcher errored: %s", err)
if errors.Is(err, fsnotify.ErrEventOverflow) {
// the queue is full and some events have been dropped
// at this point we don't know what has changed
// clear the queue of events and read all again
c.logger.Debug("draining file watcher queue")
drainQueue(watcher.Events)
c.logger.Infof("reading all sources to handle overflow")
err = readAll()
if err != nil {
// context for the error already added
c.logger.Error(err)
}
}
}
case e, ok := <-watcher.Events:
if ok {
path := filepath.Clean(e.Name)
// Windows paths are case-insensitive
if runtime.GOOS == "windows" {
path = strings.ToLower(path)
}
sources, ok := inverted[path]
if !ok {
// watching the directory, it can contain files that we are not watching
// ignore these events unless we are actively watching this file
continue
}
switch {
case e.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Remove) != 0:
// file was created, updated, or deleted (update the value)
changed := false
value := c.readContents(path)
for _, source := range sources {
previous := current[source]
if previous != value {
current[source] = value
changed = true
}
}
if changed {
err = comm.Set(current)
if err != nil {
return fmt.Errorf("failed to set current context from notify event: %w", err)
}
}
}
}
}
}
}