func()

in rest/request.go [671:757]


func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
	// We specifically don't want to rate limit watches, so we
	// don't use r.rateLimiter here.
	if r.err != nil {
		return nil, r.err
	}

	client := r.c.Client
	if client == nil {
		client = http.DefaultClient
	}

	isErrRetryableFunc := func(request *http.Request, err error) bool {
		// The watch stream mechanism handles many common partial data errors, so closed
		// connections can be retried in many cases.
		if net.IsProbableEOF(err) || net.IsTimeout(err) {
			return true
		}
		return false
	}
	var retryAfter *RetryAfter
	url := r.URL().String()
	for {
		req, err := r.newHTTPRequest(ctx)
		if err != nil {
			return nil, err
		}

		r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL()))
		if retryAfter != nil {
			// We are retrying the request that we already send to apiserver
			// at least once before.
			// This request should also be throttled with the client-internal rate limiter.
			if err := r.tryThrottleWithInfo(ctx, retryAfter.Reason); err != nil {
				return nil, err
			}
			retryAfter = nil
		}

		resp, err := client.Do(req)
		updateURLMetrics(ctx, r, resp, err)
		if r.c.base != nil {
			if err != nil {
				r.backoff.UpdateBackoff(r.c.base, err, 0)
			} else {
				r.backoff.UpdateBackoff(r.c.base, err, resp.StatusCode)
			}
		}
		if err == nil && resp.StatusCode == http.StatusOK {
			return r.newStreamWatcher(resp)
		}

		done, transformErr := func() (bool, error) {
			defer readAndCloseResponseBody(resp)

			var retry bool
			retryAfter, retry = r.retry.NextRetry(req, resp, err, isErrRetryableFunc)
			if retry {
				err := r.retry.BeforeNextRetry(ctx, r.backoff, retryAfter, url, r.body)
				if err == nil {
					return false, nil
				}
				klog.V(4).Infof("Could not retry request - %v", err)
			}

			if resp == nil {
				// the server must have sent us an error in 'err'
				return true, nil
			}
			if result := r.transformResponse(resp, req); result.err != nil {
				return true, result.err
			}
			return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
		}()
		if done {
			if isErrRetryableFunc(req, err) {
				return watch.NewEmptyWatch(), nil
			}
			if err == nil {
				// if the server sent us an HTTP Response object,
				// we need to return the error object from that.
				err = transformErr
			}
			return nil, err
		}
	}
}