func()

in internal/pkg/composable/providers/filesource/filesource.go [59:175]


func (c *contextProvider) Run(ctx context.Context, comm corecomp.ContextProviderComm) error {
	watcher, err := fsnotify.NewWatcher()
	if err != nil {
		return fmt.Errorf("failed to create watcher: %w", err)
	}
	defer watcher.Close()

	// invert the mapping to map paths to source names
	inverted := make(map[string][]string, len(c.cfg.Sources))
	for sourceName, sourceCfg := range c.cfg.Sources {
		sources, ok := inverted[sourceCfg.Path]
		if !ok {
			sources = []string{sourceName}
		} else {
			sources = append(sources, sourceName)
		}
		inverted[sourceCfg.Path] = sources
	}

	// determine the paths to watch (watch is performed on the directories that contain the file)
	//
	// you cannot register the same directory multiple times so this ensures its only registered once
	paths := make([]string, 0, len(c.cfg.Sources))
	for _, cfg := range c.cfg.Sources {
		parent := filepath.Dir(cfg.Path)
		if !slices.Contains(paths, parent) {
			paths = append(paths, parent)
		}
	}
	for _, path := range paths {
		err = watcher.Add(path)
		if err != nil {
			return fmt.Errorf("failed to watch path %q: %w", path, err)
		}
	}

	// read the initial values after the watch has started
	// this ensures that if the value changed between this code and the loop below
	// the updated file changes will not be missed
	current := make(map[string]interface{}, len(c.cfg.Sources))
	readAll := func() error {
		for path, sources := range inverted {
			value := c.readContents(path)
			for _, source := range sources {
				current[source] = value
			}
		}
		err = comm.Set(current)
		if err != nil {
			return fmt.Errorf("failed to set current context: %w", err)
		}
		return nil
	}
	err = readAll()
	if err != nil {
		// context for the error already added
		return err
	}

	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case err, ok := <-watcher.Errors:
			if ok {
				c.logger.Errorf("file watcher errored: %s", err)
				if errors.Is(err, fsnotify.ErrEventOverflow) {
					// the queue is full and some events have been dropped
					// at this point we don't know what has changed
					// clear the queue of events and read all again
					c.logger.Debug("draining file watcher queue")
					drainQueue(watcher.Events)
					c.logger.Infof("reading all sources to handle overflow")
					err = readAll()
					if err != nil {
						// context for the error already added
						c.logger.Error(err)
					}
				}
			}
		case e, ok := <-watcher.Events:
			if ok {
				path := filepath.Clean(e.Name)
				// Windows paths are case-insensitive
				if runtime.GOOS == "windows" {
					path = strings.ToLower(path)
				}
				sources, ok := inverted[path]
				if !ok {
					// watching the directory, it can contain files that we are not watching
					// ignore these events unless we are actively watching this file
					continue
				}

				switch {
				case e.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Remove) != 0:
					// file was created, updated, or deleted (update the value)
					changed := false
					value := c.readContents(path)
					for _, source := range sources {
						previous := current[source]
						if previous != value {
							current[source] = value
							changed = true
						}
					}
					if changed {
						err = comm.Set(current)
						if err != nil {
							return fmt.Errorf("failed to set current context from notify event: %w", err)
						}
					}
				}
			}
		}
	}
}