contrib/utils/utils.go (374 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package utils
import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"os"
"sort"
"strconv"
"strings"
"time"
"github.com/Azure/kperf/api/types"
"github.com/Azure/kperf/contrib/internal/manifests"
"github.com/Azure/kperf/contrib/log"
"github.com/Azure/kperf/helmcli"
"gopkg.in/yaml.v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
var (
// EKSIdleNodepoolInstanceType is the instance type of idle nodepool.
//
// NOTE: The EKS cloud provider will delete all the NOT-READY nodes
// which aren't managed by it. When kwok-controller fails to update
// virtual node's lease, the EKS cloud provider would delete that
// virtual node. It's unexpected behavior. In order to avoid this case,
// we should create a idle nodepool with one node and use that node's
// provider ID for all the virtual nodes so that EKS cloud provider
// won't delete our virtual nodes.
EKSIdleNodepoolInstanceType = "m4.large"
)
// RepeatJobWithPod repeats to deploy 3k pods.
func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, target string, internal time.Duration) {
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")
infoLogger.LogKV("msg", "repeat to create job with 3k pods")
data, err := manifests.FS.ReadFile(target)
if err != nil {
panic(fmt.Errorf("unexpected error when read %s from embed memory: %v",
target, err))
}
jobFile, cleanup, err := CreateTempFileWithContent(data)
if err != nil {
panic(fmt.Errorf("unexpected error when create job yaml: %v", err))
}
defer func() { _ = cleanup() }()
kr := NewKubectlRunner(kubeCfgPath, namespace)
infoLogger.LogKV("msg", "creating namespace", "name", namespace)
err = kr.CreateNamespace(ctx, 5*time.Minute, namespace)
if err != nil {
panic(fmt.Errorf("failed to create a new namespace %s: %v", namespace, err))
}
defer func() {
infoLogger.LogKV("msg", "cleanup namespace", "name", namespace)
err := kr.DeleteNamespace(context.TODO(), 60*time.Minute, namespace)
if err != nil {
warnLogger.LogKV("msg", "failed to cleanup namespace", "name", namespace, "error", err)
}
}()
retryInterval := 5 * time.Second
for {
select {
case <-ctx.Done():
infoLogger.LogKV("msg", "stop creating job")
return
default:
}
time.Sleep(retryInterval)
aerr := kr.Apply(ctx, 5*time.Minute, jobFile)
if aerr != nil {
warnLogger.LogKV("msg", "failed to apply job, retry after 5 seconds", "job", target, "error", aerr)
continue
}
werr := kr.Wait(ctx, 15*time.Minute, "condition=complete", "15m", "job/batchjobs")
if werr != nil {
warnLogger.LogKV("msg", "failed to wait job finish", "job", target, "error", werr)
}
derr := kr.Delete(ctx, 5*time.Minute, jobFile)
if derr != nil {
warnLogger.LogKV("msg", "failed to delete job", "job", target, "error", derr)
}
time.Sleep(internal)
}
}
// DeployAndRepeatRollingUpdateDeployments deploys and repeats to rolling-update deployments.
func DeployAndRepeatRollingUpdateDeployments(
ctx context.Context,
kubeCfgPath string,
releaseName string,
total, replica, paddingBytes int,
internal time.Duration,
) (rollingUpdateFn, cleanupFn func(), retErr error) {
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")
target := "workload/deployments"
ch, err := manifests.LoadChart(target)
if err != nil {
return nil, nil, fmt.Errorf("failed to load %s chart: %w", target, err)
}
namePattern := releaseName
releaseCli, err := helmcli.NewReleaseCli(
kubeCfgPath,
// NOTE: The deployments have fixed namespace name so here
// it's used to fill the required argument for NewReleaseCli.
"default",
releaseName,
ch,
nil,
helmcli.StringPathValuesApplier(
fmt.Sprintf("namePattern=%s", namePattern),
fmt.Sprintf("total=%d", total),
fmt.Sprintf("replica=%d", replica),
fmt.Sprintf("paddingBytes=%d", paddingBytes),
),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create a new helm release cli: %w", err)
}
infoLogger.LogKV(
"msg", "deploying deployments",
"total", total,
"replica", replica,
"paddingBytes", paddingBytes,
)
err = releaseCli.Deploy(ctx, 10*time.Minute)
if err != nil {
if errors.Is(err, context.Canceled) {
infoLogger.LogKV("msg", "deploy is canceled")
return func() {}, func() {}, nil
}
return nil, nil, fmt.Errorf("failed to deploy helm chart %s: %w", target, err)
}
infoLogger.LogKV("msg", "deployed deployments")
cleanupFn = func() {
infoLogger.LogKV("msg", "cleanup helm chart", "target", target)
err := releaseCli.Uninstall()
if err != nil {
warnLogger.LogKV("msg", "failed to cleanup helm chart",
"target", target,
"error", err)
}
}
rollingUpdateFn = func() {
for {
select {
case <-ctx.Done():
infoLogger.LogKV("msg", "stop rolling-updating")
return
case <-time.After(internal):
}
infoLogger.LogKV("msg", "start to rolling-update deployments")
for i := 0; i < total; i++ {
name := fmt.Sprintf("%s-%d", namePattern, i)
ns := name
infoLogger.LogKV("msg", "rolling-update deployment", "name", name, "namespace", ns)
err := func() error {
kr := NewKubectlRunner(kubeCfgPath, ns)
err := kr.DeploymentRestart(ctx, 2*time.Minute, name)
if err != nil {
return fmt.Errorf("failed to restart deployment %s: %w", name, err)
}
err = kr.DeploymentRolloutStatus(ctx, 10*time.Minute, name)
if err != nil {
return fmt.Errorf("failed to watch the rollout status of deployment %s: %w", name, err)
}
return nil
}()
if err != nil {
warnLogger.LogKV("msg", "failed to rolling-update",
"error", err,
"deployment", name,
"namespace", ns)
}
}
}
}
return rollingUpdateFn, cleanupFn, nil
}
// NewRunnerGroupSpecFromYAML returns RunnerGroupSpec instance from yaml data.
func NewRunnerGroupSpecFromYAML(data []byte, tweakFn func(*types.RunnerGroupSpec) error) (*types.RunnerGroupSpec, error) {
var spec types.RunnerGroupSpec
if err := yaml.Unmarshal(data, &spec); err != nil {
return nil, fmt.Errorf("failed to unmarshal into RunnerGroupSpec:\n (data: %s)\n: %w",
string(data), err)
}
if tweakFn != nil {
if err := tweakFn(&spec); err != nil {
return nil, fmt.Errorf("failed to tweak RunnerGroupSpec: %w", err)
}
}
return &spec, nil
}
// NewRunnerGroupSpecFileFromEmbed reads load profile (RunnerGroupSpec) from
// embed memory and marshals it into temporary file. Use it when invoking
// kperf binary instead of package.
func NewRunnerGroupSpecFileFromEmbed(target string, tweakFn func(*types.RunnerGroupSpec) error) (_name string, _cleanup func() error, _ error) {
data, err := manifests.FS.ReadFile(target)
if err != nil {
return "", nil, fmt.Errorf("unexpected error when read %s from embed memory: %v", target, err)
}
if tweakFn != nil {
var spec types.RunnerGroupSpec
if err = yaml.Unmarshal(data, &spec); err != nil {
return "", nil, fmt.Errorf("failed to unmarshal into RunnerGroupSpec:\n (data: %s)\n: %w",
string(data), err)
}
if err = tweakFn(&spec); err != nil {
return "", nil, err
}
data, err = yaml.Marshal(spec)
if err != nil {
return "", nil, fmt.Errorf("failed to marshal RunnerGroupSpec after tweak: %w", err)
}
}
return CreateTempFileWithContent(data)
}
// DeployRunnerGroup deploys runner group for benchmark.
func DeployRunnerGroup(ctx context.Context,
kubeCfgPath, runnerImage, rgCfgFile string,
runnerFlowControl, runnerGroupAffinity string) (*types.RunnerGroupsReport, error) {
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")
kr := NewKperfRunner(kubeCfgPath, runnerImage)
infoLogger.LogKV("msg", "deleting existing runner group")
derr := kr.RGDelete(ctx, 0)
if derr != nil {
return nil, fmt.Errorf("failed to delete existing runner group: %w", derr)
}
infoLogger.LogKV("msg", "deploying runner group")
rerr := kr.RGRun(ctx, 0, rgCfgFile, runnerFlowControl, runnerGroupAffinity)
if rerr != nil {
return nil, fmt.Errorf("failed to deploy runner group: %w", rerr)
}
infoLogger.LogKV("msg", "start to wait runner group")
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// NOTE: The result subcommand will hold the long connection
// until runner-group's server replies. However, there is no
// data transport before runners finish. If the apiserver
// has been restarted, the proxy tunnel will be broken and
// the client won't be notified. So, the client will hang forever.
// Using 1 min as timeout is to ensure we can get result in time.
data, err := kr.RGResult(ctx, 1*time.Minute)
if err != nil {
// FIXME(weifu): If the pod is not found, we should fast
// return. However, it's hard to maintain error string
// match. We should use specific commandline error code
// or use package instead of binary call.
if strings.Contains(err.Error(), `pods "runnergroup-server" not found`) {
return nil, err
}
warnLogger.LogKV("msg", fmt.Errorf("failed to fetch runner group's result: %w", err))
continue
}
infoLogger.LogKV("msg", "dump RunnerGroupsReport", "data", data)
var rgResult types.RunnerGroupsReport
if err = json.Unmarshal([]byte(data), &rgResult); err != nil {
return nil, fmt.Errorf("failed to unmarshal into RunnerGroupsReport: %w", err)
}
infoLogger.LogKV("msg", "deleting runner group")
if derr := kr.RGDelete(ctx, 0); derr != nil {
warnLogger.LogKV("msg", "failed to delete runner group", "err", err)
}
return &rgResult, nil
}
}
// FetchAPIServerCores fetchs core number for each kube-apiserver.
func FetchAPIServerCores(ctx context.Context, kubeCfgPath string) (map[string]int, error) {
logger := log.GetLogger(ctx)
logger.WithKeyValues("level", "info").LogKV("msg", "fetching apiserver's cores")
kr := NewKubectlRunner(kubeCfgPath, "")
fqdn, err := kr.FQDN(ctx, 0)
if err != nil {
return nil, fmt.Errorf("failed to get cluster fqdn: %w", err)
}
ips, nerr := NSLookup(fqdn)
if nerr != nil {
return nil, fmt.Errorf("failed get dns records of fqdn %s: %w", fqdn, nerr)
}
res := map[string]int{}
for _, ip := range ips {
cores, err := func() (int, error) {
data, err := kr.Metrics(ctx, 0, fqdn, ip)
if err != nil {
return 0, fmt.Errorf("failed to get metrics for ip %s: %w", ip, err)
}
lines := strings.Split(string(data), "\n")
for _, line := range lines {
if strings.HasPrefix(line, "go_sched_gomaxprocs_threads") {
vInStr := strings.Fields(line)[1]
v, err := strconv.Atoi(vInStr)
if err != nil {
return 0, fmt.Errorf("failed to parse go_sched_gomaxprocs_threads %s: %w", line, err)
}
return v, nil
}
}
return 0, fmt.Errorf("failed to get go_sched_gomaxprocs_threads")
}()
if err != nil {
logger.WithKeyValues("level", "warn").LogKV("msg", "failed to get cores", "ip", ip, "error", err)
continue
}
logger.LogKV(ip, cores)
res[ip] = cores
}
return res, nil
}
// FetchNodeProviderIDByType is used to get one node's provider id with a given
// instance type.
func FetchNodeProviderIDByType(ctx context.Context, kubeCfgPath string, instanceType string) (string, error) {
clientset, err := BuildClientset(kubeCfgPath)
if err != nil {
return "", err
}
label := fmt.Sprintf("node.kubernetes.io/instance-type=%v", instanceType)
nodeCli := clientset.CoreV1().Nodes()
listResp, err := nodeCli.List(ctx, metav1.ListOptions{LabelSelector: label})
if err != nil {
return "", fmt.Errorf("failed to list nodes with label %s: %w", label, err)
}
if len(listResp.Items) == 0 {
return "", fmt.Errorf("there is no such node with label %s", label)
}
return listResp.Items[0].Spec.ProviderID, nil
}
// BuildClientset returns kubernetes clientset.
func BuildClientset(kubeCfgPath string) (*kubernetes.Clientset, error) {
config, err := clientcmd.BuildConfigFromFlags("", kubeCfgPath)
if err != nil {
return nil, fmt.Errorf("failed to build client-go config: %w", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to build client-go rest client: %w", err)
}
return clientset, nil
}
// NSLookup returns ips for URL.
func NSLookup(domainURL string) ([]string, error) {
ips, err := net.LookupHost(domainURL)
if err != nil {
return nil, err
}
sort.Strings(ips)
return ips, nil
}
// runCommand runs command with Pdeathsig.
func runCommand(ctx context.Context, timeout time.Duration, cmd string, args []string) ([]byte, error) {
logger := log.GetLogger(ctx)
var cancel context.CancelFunc
if timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
c := newExecCommand(ctx, cmd, args...)
logger.WithKeyValues("level", "info").LogKV("msg", "start command", "cmd", c.String())
output, err := c.CombinedOutput()
if err != nil {
return nil, fmt.Errorf("failed to invoke %s:\n (output: %s): %w",
c.String(), strings.TrimSpace(string(output)), err)
}
return output, nil
}
// runCommandWithInput executes a command with `input` piped through stdin.
func runCommandWithInput(ctx context.Context, timeout time.Duration, cmd string, args []string, input string) ([]byte, error) {
logger := log.GetLogger(ctx)
var cancel context.CancelFunc
if timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
c := newExecCommand(ctx, cmd, args...)
c.Stdin = strings.NewReader(input)
logger.WithKeyValues("level", "info").LogKV("msg", "start command", "cmd", c.String())
output, err := c.CombinedOutput()
if err != nil {
return nil, fmt.Errorf("failed to invoke %s:\n (output: %s): %w",
c.String(), strings.TrimSpace(string(output)), err)
}
return output, nil
}
// CreateTempFileWithContent creates temporary file with data.
func CreateTempFileWithContent(data []byte) (_name string, _cleanup func() error, retErr error) {
f, err := os.CreateTemp("", "temp*")
if err != nil {
return "", nil, fmt.Errorf("failed to create temporary file: %w", err)
}
fName := f.Name()
defer func() {
if retErr != nil {
_ = os.RemoveAll(fName)
}
}()
_, err = f.Write(data)
f.Close()
if err != nil {
return "", nil, fmt.Errorf("failed to write temporary in %s: %w",
fName, err)
}
return fName, func() error {
return os.RemoveAll(fName)
}, nil
}