runner/group/handler.go (409 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package group
import (
"context"
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"time"
"github.com/Azure/kperf/api/types"
"gopkg.in/yaml.v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
apitypes "k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)
var (
// errRetryable marks error is retryable
errRetryable = errors.New("retry")
)
// Handler is to run a set of runners with same load profile.
type Handler struct {
name string
namespace string
spec *types.RunnerGroupSpec
ownerRef *metav1.OwnerReference
// FIXME(weifu): should we migrate this field into RunnerGroupSpec?
imageRef string
clientset kubernetes.Interface
runnerVerbosity int
}
// NewHandler returns new instance of Handler.
func NewHandler(
clientset kubernetes.Interface,
namespace, name string,
spec *types.RunnerGroupSpec,
imageRef string,
runnerVerbosity int,
) (*Handler, error) {
ownRef, err := buildOwnerReference(spec.OwnerReference)
if err != nil {
return nil, err
}
return &Handler{
name: name,
namespace: namespace,
spec: spec,
ownerRef: ownRef,
imageRef: imageRef,
clientset: clientset,
runnerVerbosity: runnerVerbosity,
}, nil
}
// Name returns RunnerGroup's name
func (h *Handler) Name() string {
return h.name
}
// Info returns RunnerGroup information with status.
func (h *Handler) Info(ctx context.Context) *types.RunnerGroup {
rg := &types.RunnerGroup{
Name: h.name,
Spec: h.spec,
Status: &types.RunnerGroupStatus{
State: types.RunnerGroupStatusStateUnknown,
},
}
cli := h.clientset.BatchV1().Jobs(h.namespace)
job, err := cli.Get(ctx, h.name, metav1.GetOptions{})
if err != nil {
klog.V(2).ErrorS(err, "failed to job for runner group", "job", h.name)
return rg
}
state := types.RunnerGroupStatusStateRunning
if jobFinished(job) {
state = types.RunnerGroupStatusStateFinished
} else if job.Status.StartTime == nil {
state = types.RunnerGroupStatusStateUnknown
}
rg.Status.State = state
rg.Status.StartTime = job.Status.StartTime
rg.Status.Succeeded = job.Status.Succeeded
rg.Status.Failed = job.Status.Failed
return rg
}
// Deploy deploys a group of runners.
func (h *Handler) Deploy(ctx context.Context, uploadURL string) error {
if err := h.uploadLoadProfileAsConfigMap(ctx); err != nil {
return fmt.Errorf("failed to ensure if load profile has been uploaded: %w", err)
}
return h.deployRunners(ctx, uploadURL)
}
// configMapDataKeyLoadProfile is load profile's name in configmap.
var configMapDataKeyLoadProfile = "load_profile.yaml"
// uploadLoadProfileAsConfigMap stores load profile as configmap for runner.
func (h *Handler) uploadLoadProfileAsConfigMap(ctx context.Context) error {
cli := h.clientset.CoreV1().ConfigMaps(h.namespace)
cm, err := cli.Get(ctx, h.name, metav1.GetOptions{})
if err == nil {
// FIXME: should we check the content?
if _, ok := cm.Data[configMapDataKeyLoadProfile]; !ok {
return fmt.Errorf("configmap %s doesn't have load profile", h.name)
}
return nil
}
if !apierrors.IsNotFound(err) {
return err
}
raw, err := yaml.Marshal(h.spec.Profile)
if err != nil {
return fmt.Errorf("failed to marshal load profile into yaml: %w", err)
}
cm = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: h.name,
Namespace: h.namespace,
},
Immutable: toPtr(true),
Data: map[string]string{
configMapDataKeyLoadProfile: string(raw),
},
}
if h.ownerRef != nil {
cm.OwnerReferences = append(cm.OwnerReferences, *h.ownerRef)
}
_, err = cli.Create(ctx, cm, metav1.CreateOptions{})
return err
}
// deployRunners deploys a group of runners as batch job.
func (h *Handler) deployRunners(ctx context.Context, uploadURL string) error {
cli := h.clientset.BatchV1().Jobs(h.namespace)
_, err := cli.Get(ctx, h.name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
_, err = cli.Create(ctx, h.buildBatchJobObject(uploadURL), metav1.CreateOptions{})
}
return err
}
// FIXME: should we check the content?
return nil
}
// Pods returns all the pods controlled by the job.
func (h *Handler) Pods(ctx context.Context) ([]*corev1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(
&metav1.LabelSelector{
MatchLabels: map[string]string{
"batch.kubernetes.io/job-name": h.name,
"job-name": h.name,
},
},
)
if err != nil {
return nil, fmt.Errorf("failed to create label selector: %w", err)
}
opts := metav1.ListOptions{
LabelSelector: selector.String(),
// NOTE:
//
// List pods from cache to prevent apiserver from list all
// items from ETCD cluster.
ResourceVersion: "0",
}
pods, err := h.clientset.CoreV1().Pods(h.namespace).List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to list pods created by jobs %s: %w",
h.name, err)
}
res := make([]*corev1.Pod, 0, len(pods.Items))
for idx := range pods.Items {
pod := pods.Items[idx]
res = append(res, &pod)
}
return res, nil
}
// IsControlled returns true if the pod is controlled by the group.
func (h *Handler) IsControlled(ctx context.Context, podName string) (bool, error) {
// Fast path: job's name will be the prefix of pod's name.
if !strings.HasPrefix(podName, h.name) {
return false, nil
}
pods, err := h.Pods(ctx)
if err != nil {
return false, err
}
for _, pod := range pods {
if pod.Name == podName {
return true, nil
}
}
return false, nil
}
// Wait waits runners until they finish.
func (h *Handler) Wait(ctx context.Context) error {
cli := h.clientset.BatchV1().Jobs(h.namespace)
job, err := cli.Get(ctx, h.name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get job %s from namespace %s: %w",
h.name, h.namespace, err)
}
if jobFinished(job) {
return nil
}
// NOTE: It's to align with client-go package. Please check out the
// following reference for detail.
//
// https://github.com/kubernetes/client-go/blob/v0.28.4/tools/cache/reflector.go#L219
//
// TODO(weifu): fix staticcheck check
//
//nolint:staticcheck
backoff := wait.NewExponentialBackoffManager(
800*time.Millisecond, 30*time.Second, 2*time.Minute,
2.0, 1.0, &clock.RealClock{})
lastRv := job.ResourceVersion
fieldSelector := fields.OneTermEqualSelector(metav1.ObjectNameField, h.name).String()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
opts := metav1.ListOptions{
FieldSelector: fieldSelector,
ResourceVersion: lastRv,
AllowWatchBookmarks: true,
}
w, err := cli.Watch(ctx, opts)
if err != nil {
// should retry if apiserver is down or unavailable.
if utilnet.IsConnectionRefused(err) ||
apierrors.IsTooManyRequests(err) ||
apierrors.IsInternalError(err) {
<-backoff.Backoff().C()
continue
}
return fmt.Errorf("failed to initialize watch for job %s: %w", h.name, err)
}
err = h.waitForJob(ctx, w, &lastRv)
if err != nil {
switch {
case apierrors.IsResourceExpired(err) || apierrors.IsGone(err):
klog.V(2).Infof("reset last seen revision and continue, since receive: %v", err)
lastRv = ""
continue
// should retry if apiserver is down or unavailable.
case apierrors.IsTooManyRequests(err) || apierrors.IsInternalError(err):
<-backoff.Backoff().C()
continue
case errors.Is(err, errRetryable):
<-backoff.Backoff().C()
continue
default:
return err
}
}
return nil
}
}
// waitForJob will return if job finish.
func (h *Handler) waitForJob(ctx context.Context, w watch.Interface, rv *string) error {
defer w.Stop()
expectedType := reflect.TypeOf(&batchv1.Job{})
for {
select {
case <-ctx.Done():
return ctx.Err()
case event, ok := <-w.ResultChan():
if !ok {
return fmt.Errorf("unexpected closed watch channel: %w", errRetryable)
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
obj := event.Object
if typ := reflect.TypeOf(obj); typ != expectedType {
klog.V(2).Infof("unexpected type: %v", typ)
continue
}
job := obj.(*batchv1.Job)
switch event.Type {
case watch.Modified:
klog.V(5).Infof("Job %s Expected %v, Failed %v, Successed: %v",
job.Name, *job.Spec.Completions, job.Status.Failed, job.Status.Succeeded)
if jobFinished(job) {
return nil
}
default:
klog.V(2).Infof("receive event type %s", event.Type)
}
*rv = job.ResourceVersion
}
}
}
// buildBatchJobObject builds job object to run runners.
func (h *Handler) buildBatchJobObject(uploadURL string) *batchv1.Job {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: h.name,
Namespace: h.namespace,
},
Spec: batchv1.JobSpec{
Parallelism: toPtr(h.spec.Count),
Completions: toPtr(h.spec.Count),
BackoffLimit: toPtr(int32(0)),
// FIXME: Should not re-create pod
CompletionMode: toPtr(batchv1.IndexedCompletion),
Template: corev1.PodTemplateSpec{},
},
}
if h.ownerRef != nil {
job.OwnerReferences = append(job.OwnerReferences, *h.ownerRef)
}
job.Spec.Template.Spec = corev1.PodSpec{
Affinity: &corev1.Affinity{},
Containers: []corev1.Container{
{
Name: "runner",
Image: h.imageRef,
Env: []corev1.EnvVar{
{
Name: "POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: metav1.ObjectNameField,
},
},
},
{
Name: "POD_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: "POD_UID",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.uid",
},
},
},
{
Name: "TARGET_URL",
Value: uploadURL,
},
{
Name: "RUNNER_VERBOSITY",
Value: strconv.Itoa(h.runnerVerbosity),
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "config",
MountPath: "/config",
},
{
Name: "host-root-tmp",
MountPath: "/data",
},
},
Command: []string{
"/run_runner.sh",
},
},
},
RestartPolicy: corev1.RestartPolicyNever,
Volumes: []corev1.Volume{
{
Name: "config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: h.name,
},
},
},
},
{
Name: "host-root-tmp",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/tmp",
},
},
},
},
}
if len(h.spec.NodeAffinity) > 0 {
matchExpressions := make([]corev1.NodeSelectorRequirement, 0, len(h.spec.NodeAffinity))
for key, values := range h.spec.NodeAffinity {
matchExpressions = append(matchExpressions, corev1.NodeSelectorRequirement{
Key: key,
Operator: corev1.NodeSelectorOpIn,
Values: values,
})
}
job.Spec.Template.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: matchExpressions,
},
},
},
}
}
if sa := h.spec.ServiceAccount; sa != nil {
job.Spec.Template.Spec.ServiceAccountName = *sa
}
return job
}
func buildOwnerReference(ref *string) (*metav1.OwnerReference, error) {
if ref == nil {
return nil, nil
}
tokens := strings.SplitN(*ref, ":", 4)
if len(tokens) != 4 {
return nil, fmt.Errorf("%s own reference is not apiVersion:kind:name:uid format", *ref)
}
return &metav1.OwnerReference{
APIVersion: tokens[0],
Kind: tokens[1],
Name: tokens[2],
UID: apitypes.UID(tokens[3]),
Controller: toPtr(true),
}, nil
}
func jobFinished(job *batchv1.Job) bool {
return job.Status.Failed+job.Status.Succeeded == *job.Spec.Completions
}
func toPtr[T any](v T) *T {
return &v
}