request/random.go (301 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package request
import (
"context"
"crypto/rand"
"fmt"
"math/big"
"sync"
"github.com/Azure/kperf/api/types"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
)
// WeightedRandomRequests is used to generate requests based on LoadProfileSpec.
type WeightedRandomRequests struct {
once sync.Once
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
reqBuilderCh chan RESTRequestBuilder
shares []int
reqBuilders []RESTRequestBuilder
}
// NewWeightedRandomRequests creates new instance of WeightedRandomRequests.
func NewWeightedRandomRequests(spec *types.LoadProfileSpec) (*WeightedRandomRequests, error) {
if err := spec.Validate(); err != nil {
return nil, fmt.Errorf("invalid load profile spec: %v", err)
}
shares := make([]int, 0, len(spec.Requests))
reqBuilders := make([]RESTRequestBuilder, 0, len(spec.Requests))
for _, r := range spec.Requests {
shares = append(shares, r.Shares)
var builder RESTRequestBuilder
switch {
case r.StaleList != nil:
builder = newRequestListBuilder(r.StaleList, "0", spec.MaxRetries)
case r.QuorumList != nil:
builder = newRequestListBuilder(r.QuorumList, "", spec.MaxRetries)
case r.WatchList != nil:
builder = newRequestWatchListBuilder(r.WatchList, spec.MaxRetries)
case r.StaleGet != nil:
builder = newRequestGetBuilder(r.StaleGet, "0", spec.MaxRetries)
case r.QuorumGet != nil:
builder = newRequestGetBuilder(r.QuorumGet, "", spec.MaxRetries)
case r.GetPodLog != nil:
builder = newRequestGetPodLogBuilder(r.GetPodLog, spec.MaxRetries)
default:
return nil, fmt.Errorf("not implement for PUT yet")
}
reqBuilders = append(reqBuilders, builder)
}
ctx, cancel := context.WithCancel(context.Background())
return &WeightedRandomRequests{
ctx: ctx,
cancel: cancel,
reqBuilderCh: make(chan RESTRequestBuilder),
shares: shares,
reqBuilders: reqBuilders,
}, nil
}
// Run starts to random pick request.
func (r *WeightedRandomRequests) Run(ctx context.Context, total int) {
defer r.wg.Done()
r.wg.Add(1)
sum := 0
for sum < total {
builder := r.randomPick()
select {
case r.reqBuilderCh <- builder:
sum++
case <-r.ctx.Done():
return
case <-ctx.Done():
return
}
}
}
// Chan returns channel to get random request.
func (r *WeightedRandomRequests) Chan() chan RESTRequestBuilder {
return r.reqBuilderCh
}
func (r *WeightedRandomRequests) randomPick() RESTRequestBuilder {
sum := 0
for _, s := range r.shares {
sum += s
}
rndInt, err := rand.Int(rand.Reader, big.NewInt(int64(sum)))
if err != nil {
panic(err)
}
rnd := rndInt.Int64()
for i := range r.shares {
s := int64(r.shares[i])
if rnd < s {
return r.reqBuilders[i]
}
rnd -= s
}
panic("unreachable")
}
// Stop stops request generator.
func (r *WeightedRandomRequests) Stop() {
r.once.Do(func() {
r.cancel()
r.wg.Wait()
close(r.reqBuilderCh)
})
}
// RESTRequestBuilder is used to build rest.Request.
type RESTRequestBuilder interface {
Build(cli rest.Interface) Requester
}
type requestGetBuilder struct {
version schema.GroupVersion
resource string
namespace string
name string
resourceVersion string
maxRetries int
}
func newRequestGetBuilder(src *types.RequestGet, resourceVersion string, maxRetries int) *requestGetBuilder {
return &requestGetBuilder{
version: schema.GroupVersion{
Group: src.Group,
Version: src.Version,
},
resource: src.Resource,
namespace: src.Namespace,
name: src.Name,
resourceVersion: resourceVersion,
maxRetries: maxRetries,
}
}
// Build implements RequestBuilder.Build.
func (b *requestGetBuilder) Build(cli rest.Interface) Requester {
// https://kubernetes.io/docs/reference/using-api/#api-groups
comps := make([]string, 0, 5)
if b.version.Group == "" {
comps = append(comps, "api", b.version.Version)
} else {
comps = append(comps, "apis", b.version.Group, b.version.Version)
}
comps = append(comps, b.resource, b.name)
return &DiscardRequester{
BaseRequester: BaseRequester{
method: "GET",
req: cli.Get().AbsPath(comps...).
SpecificallyVersionedParams(
&metav1.GetOptions{ResourceVersion: b.resourceVersion},
scheme.ParameterCodec,
schema.GroupVersion{Version: "v1"},
).MaxRetries(b.maxRetries),
},
}
}
type requestListBuilder struct {
version schema.GroupVersion
resource string
namespace string
limit int64
labelSelector string
fieldSelector string
resourceVersion string
maxRetries int
}
func newRequestListBuilder(src *types.RequestList, resourceVersion string, maxRetries int) *requestListBuilder {
return &requestListBuilder{
version: schema.GroupVersion{
Group: src.Group,
Version: src.Version,
},
resource: src.Resource,
namespace: src.Namespace,
limit: int64(src.Limit),
labelSelector: src.Selector,
fieldSelector: src.FieldSelector,
resourceVersion: resourceVersion,
maxRetries: maxRetries,
}
}
// Build implements RequestBuilder.Build.
func (b *requestListBuilder) Build(cli rest.Interface) Requester {
// https://kubernetes.io/docs/reference/using-api/#api-groups
comps := make([]string, 0, 5)
if b.version.Group == "" {
comps = append(comps, "api", b.version.Version)
} else {
comps = append(comps, "apis", b.version.Group, b.version.Version)
}
if b.namespace != "" {
comps = append(comps, "namespaces", b.namespace)
}
comps = append(comps, b.resource)
return &DiscardRequester{
BaseRequester: BaseRequester{
method: "LIST",
req: cli.Get().AbsPath(comps...).
SpecificallyVersionedParams(
&metav1.ListOptions{
LabelSelector: b.labelSelector,
FieldSelector: b.fieldSelector,
ResourceVersion: b.resourceVersion,
Limit: b.limit,
},
scheme.ParameterCodec,
schema.GroupVersion{Version: "v1"},
).MaxRetries(b.maxRetries),
},
}
}
type requestWatchListBuilder struct {
version schema.GroupVersion
resource string
namespace string
labelSelector string
fieldSelector string
maxRetries int
}
func newRequestWatchListBuilder(src *types.RequestWatchList, maxRetries int) *requestWatchListBuilder {
return &requestWatchListBuilder{
version: schema.GroupVersion{
Group: src.Group,
Version: src.Version,
},
resource: src.Resource,
namespace: src.Namespace,
labelSelector: src.Selector,
fieldSelector: src.FieldSelector,
maxRetries: maxRetries,
}
}
// Build implements RequestBuilder.Build.
func (b *requestWatchListBuilder) Build(cli rest.Interface) Requester {
// https://kubernetes.io/docs/reference/using-api/#api-groups
comps := make([]string, 0, 5)
if b.version.Group == "" {
comps = append(comps, "api", b.version.Version)
} else {
comps = append(comps, "apis", b.version.Group, b.version.Version)
}
if b.namespace != "" {
comps = append(comps, "namespaces", b.namespace)
}
comps = append(comps, b.resource)
return &WatchListRequester{
BaseRequester: BaseRequester{
method: "WATCHLIST",
req: cli.Get().AbsPath(comps...).
SpecificallyVersionedParams(
&metav1.ListOptions{
LabelSelector: b.labelSelector,
FieldSelector: b.fieldSelector,
ResourceVersion: "",
Watch: true,
SendInitialEvents: toPtr(true),
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
AllowWatchBookmarks: true,
},
scheme.ParameterCodec,
schema.GroupVersion{Version: "v1"},
).MaxRetries(b.maxRetries),
},
}
}
type requestGetPodLogBuilder struct {
namespace string
name string
container string
tailLines *int64
limitBytes *int64
maxRetries int
}
func newRequestGetPodLogBuilder(src *types.RequestGetPodLog, maxRetries int) *requestGetPodLogBuilder {
b := &requestGetPodLogBuilder{
namespace: src.Namespace,
name: src.Name,
container: src.Container,
maxRetries: maxRetries,
}
if src.TailLines != nil {
b.tailLines = toPtr(*src.TailLines)
}
if src.LimitBytes != nil {
b.limitBytes = toPtr(*src.LimitBytes)
}
return b
}
// Build implements RequestBuilder.Build.
func (b *requestGetPodLogBuilder) Build(cli rest.Interface) Requester {
// https://kubernetes.io/docs/reference/using-api/#api-groups
apiPath, version := "api", "v1"
comps := make([]string, 2, 7)
comps[0], comps[1] = apiPath, version
comps = append(comps, "namespaces", b.namespace)
comps = append(comps, "pods", b.name, "log")
return &DiscardRequester{
BaseRequester: BaseRequester{
method: "POD_LOG",
req: cli.Get().AbsPath(comps...).
SpecificallyVersionedParams(
&corev1.PodLogOptions{
Container: b.container,
TailLines: b.tailLines,
LimitBytes: b.limitBytes,
},
scheme.ParameterCodec,
schema.GroupVersion{Version: "v1"},
).MaxRetries(b.maxRetries),
},
}
}
func toPtr[T any](v T) *T {
return &v
}