in readers/apiserver/watchlist/internal/watchlist/watchlist.go [445:509]
func (r *Reader) watch(ctx context.Context, rt RetrieveType, spawnWatchers []spawnWatcher) error {
if r.fakeWatch != nil {
return r.fakeWatch(ctx, rt, spawnWatchers)
}
if ctx.Err() != nil {
return nil
}
var rv string
options := metav1.ListOptions{
ResourceVersion: rv,
Watch: true,
}
var err error
watchers := make([]watch.Interface, len(spawnWatchers))
defer func() {
if err != nil {
for _, watcher := range watchers {
if watcher != nil {
watcher.Stop()
}
}
}
}()
for i := 0; i < len(spawnWatchers); i++ {
var watcher watch.Interface
watcher, err = spawnWatchers[i](options)
if err != nil {
return fmt.Errorf("error creating %v watcher: %v", rt, err)
}
watchers[i] = watcher
}
for i, watcher := range watchers {
i := i
watcher := watcher
r.waitWatchers.Go(ctx, func(ctx context.Context) error {
watcher := watcher
for {
// This blocks until watchEvents returns, which is when a watcher is closed.
var err error
_, err = r.watchEvents(ctx, watcher)
if err != nil {
r.log.Error(fmt.Sprintf("error watching %v events: %v", rt, err))
}
// This would indicate that the watcher was intentionally closed.
if ctx.Err() != nil {
return nil
}
// Since it was not intentionally closed, we should try to reconnect.
watcher, err = spawnWatchers[i](options)
if err != nil {
r.log.Error(fmt.Sprintf("error creating %v watcher: %v", rt, err))
}
time.Sleep(time.Duration(1+rand.IntN(10)) * time.Second)
}
})
}
return nil
}