func Observe()

in config/snapshot/config_snapshot.go [43:91]


func Observe(ctx context.Context, log logrus.FieldLogger, client gcs.ConditionalClient, configPath gcs.Path, ticker <-chan time.Time) (<-chan *Config, error) {
	ch := make(chan *Config)
	if log == nil {
		log = logrus.New()
	}
	log = log.WithField("observed-path", configPath.String())

	initialSnap, err := updateHash(ctx, client, configPath)
	if err != nil {
		return nil, fmt.Errorf("can't read %q: %w", configPath.String(), err)
	}
	cond := storage.Conditions{
		GenerationNotMatch: initialSnap.Attrs.Generation,
	}
	client = client.If(&cond, nil)

	go func() {
		defer close(ch)
		ch <- initialSnap
		if ticker == nil {
			return
		}
	nextTick:
		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker:
				snap, err := updateHash(ctx, client, configPath)
				if err != nil {
					if !gcs.IsPreconditionFailed(err) {
						log.WithError(err).Warning("Error fetching updated config")
					}
					continue nextTick
				}
				// Configuration changed
				select {
				case <-ctx.Done():
					return
				case ch <- snap:
					cond.GenerationNotMatch = snap.Attrs.Generation
				}
			}

		}
	}()

	return ch, nil
}