func()

in readers/apiserver/watchlist/internal/watchlist/watchlist.go [445:509]


func (r *Reader) watch(ctx context.Context, rt RetrieveType, spawnWatchers []spawnWatcher) error {
	if r.fakeWatch != nil {
		return r.fakeWatch(ctx, rt, spawnWatchers)
	}

	if ctx.Err() != nil {
		return nil
	}

	var rv string
	options := metav1.ListOptions{
		ResourceVersion: rv,
		Watch:           true,
	}

	var err error
	watchers := make([]watch.Interface, len(spawnWatchers))
	defer func() {
		if err != nil {
			for _, watcher := range watchers {
				if watcher != nil {
					watcher.Stop()
				}
			}
		}
	}()
	for i := 0; i < len(spawnWatchers); i++ {
		var watcher watch.Interface
		watcher, err = spawnWatchers[i](options)
		if err != nil {
			return fmt.Errorf("error creating %v watcher: %v", rt, err)
		}
		watchers[i] = watcher
	}

	for i, watcher := range watchers {
		i := i
		watcher := watcher

		r.waitWatchers.Go(ctx, func(ctx context.Context) error {
			watcher := watcher
			for {
				// This blocks until watchEvents returns, which is when a watcher is closed.
				var err error
				_, err = r.watchEvents(ctx, watcher)
				if err != nil {
					r.log.Error(fmt.Sprintf("error watching %v events: %v", rt, err))
				}
				// This would indicate that the watcher was intentionally closed.
				if ctx.Err() != nil {
					return nil
				}

				// Since it was not intentionally closed, we should try to reconnect.
				watcher, err = spawnWatchers[i](options)
				if err != nil {
					r.log.Error(fmt.Sprintf("error creating %v watcher: %v", rt, err))
				}
				time.Sleep(time.Duration(1+rand.IntN(10)) * time.Second)
			}
		})
	}

	return nil
}