request/requester.go (79 lines of code) (raw):
package request
import (
"context"
"fmt"
"io"
"net/url"
"reflect"
"time"
_ "unsafe" // unsafe to use internal function from client-go
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/clock"
)
type Requester interface {
Method() string
URL() *url.URL
Timeout(time.Duration)
Do(context.Context) (bytes int64, err error)
}
type BaseRequester struct {
method string
req *rest.Request
}
func (reqr *BaseRequester) Method() string {
return reqr.method
}
func (reqr *BaseRequester) URL() *url.URL {
return reqr.req.URL()
}
func (reqr *BaseRequester) Timeout(timeout time.Duration) {
reqr.req.Timeout(timeout)
}
type DiscardRequester struct {
BaseRequester
}
func (reqr *DiscardRequester) Do(ctx context.Context) (bytes int64, err error) {
respBody, err := reqr.req.Stream(ctx)
if err != nil {
return 0, err
}
defer respBody.Close()
return io.Copy(io.Discard, respBody)
}
type WatchListRequester struct {
BaseRequester
}
func (reqr *WatchListRequester) Do(ctx context.Context) (zero int64, _ error) {
cl := clock.RealClock{}
temporaryStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
start := time.Now()
w, err := reqr.req.Watch(ctx)
if err != nil {
return zero, err
}
watchListBookmarkReceived, err := handleAnyWatch(start, w, temporaryStore, nil, nil, "", "", func(_ string) {}, true, cl, make(chan error), ctx.Done())
w.Stop()
if err != nil {
return zero, err
}
if watchListBookmarkReceived {
return zero, nil
}
return zero, fmt.Errorf("don't receive bookmark")
}
//go:linkname handleAnyWatch k8s.io/client-go/tools/cache.handleAnyWatch
func handleAnyWatch(start time.Time,
w watch.Interface,
store cache.Store,
expectedType reflect.Type,
expectedGVK *schema.GroupVersionKind,
name string,
expectedTypeName string,
setLastSyncResourceVersion func(string),
exitOnWatchListBookmarkReceived bool,
clock clock.Clock,
errCh chan error,
stopCh <-chan struct{},
) (bool, error)