in rest/request.go [671:757]
func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
// We specifically don't want to rate limit watches, so we
// don't use r.rateLimiter here.
if r.err != nil {
return nil, r.err
}
client := r.c.Client
if client == nil {
client = http.DefaultClient
}
isErrRetryableFunc := func(request *http.Request, err error) bool {
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
if net.IsProbableEOF(err) || net.IsTimeout(err) {
return true
}
return false
}
var retryAfter *RetryAfter
url := r.URL().String()
for {
req, err := r.newHTTPRequest(ctx)
if err != nil {
return nil, err
}
r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
if retryAfter != nil {
// We are retrying the request that we already send to apiserver
// at least once before.
// This request should also be throttled with the client-internal rate limiter.
if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
return nil, err
}
retryAfter = nil
}
resp, err := client.Do(req)
updateURLMetrics(ctx, r, resp, err)
if r.c.base != nil {
if err != nil {
r.backoff.UpdateBackoff(r.c.base, err, 0)
} else {
r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
}
}
if err == nil && resp.StatusCode == http.StatusOK {
return r.newStreamWatcher(resp)
}
done, transformErr := func() (bool, error) {
defer readAndCloseResponseBody(resp)
var retry bool
retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc)
if retry {
err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
if err == nil {
return false, nil
}
klog.V(4).Infof("Could not retry request - %v", err)
}
if resp == nil {
// the server must have sent us an error in 'err'
return true, nil
}
if result := r.transformResponse(resp, req); result.err != nil {
return true, result.err
}
return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
}()
if done {
if isErrRetryableFunc(req, err) {
return watch.NewEmptyWatch(), nil
}
if err == nil {
// if the server sent us an HTTP Response object,
// we need to return the error object from that.
err = transformErr
}
return nil, err
}
}
}