internal/task/run_job.go (128 lines of code) (raw):
package task
import (
"context"
"fmt"
"os"
k8sbatchv1 "k8s.io/api/batch/v1"
k8scorev1 "k8s.io/api/core/v1"
k8smetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/Azure/k6ctl/internal/kubelib"
"github.com/Azure/k6ctl/internal/stdlib"
)
func (tr *taskRunner) buildJobObject() (*k8sbatchv1.Job, error) {
taskName := tr.taskConfig.Name
k6RunnerImage := tr.taskConfig.K6.PodImage
scriptToRun := tr.script
var envFrom []k8scorev1.EnvFromSource
if len(tr.taskConfig.Configs) > 0 {
envFrom = append(envFrom, k8scorev1.EnvFromSource{
SecretRef: &k8scorev1.SecretEnvSource{
LocalObjectReference: k8scorev1.LocalObjectReference{
Name: tr.configsSecretName(),
},
},
})
}
var volumes []k8scorev1.Volume
if len(tr.taskConfig.Files) > 0 {
volumes = append(volumes, k8scorev1.Volume{
Name: scriptsVolumeName,
VolumeSource: k8scorev1.VolumeSource{
ConfigMap: &k8scorev1.ConfigMapVolumeSource{
LocalObjectReference: k8scorev1.LocalObjectReference{
Name: tr.scriptsConfigMapName(),
},
},
},
})
}
rv := &k8sbatchv1.Job{
ObjectMeta: k8smetav1.ObjectMeta{
Name: tr.taskJobName(),
Namespace: tr.objectNamespace(),
Labels: map[string]string{
labelKeyTaskName: taskName,
},
},
Spec: k8sbatchv1.JobSpec{
TTLSecondsAfterFinished: stdlib.Ptr[int32](100),
Parallelism: stdlib.Ptr[int32](tr.instances), // TODO: configurable
Completions: stdlib.Ptr[int32](tr.instances), // TODO: configurable
ManualSelector: stdlib.Ptr(true),
Selector: &k8smetav1.LabelSelector{
MatchLabels: map[string]string{
labelKeyTaskName: taskName,
},
},
Template: k8scorev1.PodTemplateSpec{
ObjectMeta: k8smetav1.ObjectMeta{
Labels: map[string]string{
labelKeyTaskName: taskName,
},
},
Spec: k8scorev1.PodSpec{
Containers: []k8scorev1.Container{
{
Name: containerNameRunner,
Image: k6RunnerImage,
Args: []string{"run", scriptToRun},
VolumeMounts: []k8scorev1.VolumeMount{
{
Name: scriptsVolumeName,
MountPath: containerScriptsPath,
},
},
EnvFrom: envFrom,
WorkingDir: containerScriptsPath,
},
},
RestartPolicy: "Never",
Volumes: volumes,
Affinity: &k8scorev1.Affinity{
PodAntiAffinity: &k8scorev1.PodAntiAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: []k8scorev1.PodAffinityTerm{
{
LabelSelector: &k8smetav1.LabelSelector{
MatchExpressions: []k8smetav1.LabelSelectorRequirement{
{
Key: labelKeyTaskName,
Operator: "In",
Values: []string{taskName},
},
},
},
TopologyKey: "kubernetes.io/hostname",
},
},
},
},
},
},
},
}
return rv, nil
}
func (tr *taskRunner) followJobLogs(
ctx context.Context,
job *k8sbatchv1.Job,
) error {
selector, err := k8smetav1.LabelSelectorAsSelector(job.Spec.Selector)
if err != nil {
return fmt.Errorf("invalid job selector: %w", err)
}
addPrefix := false
if stdlib.ValOrZero(job.Spec.Parallelism) > 1 {
// there might be multiple pods, add prefix to distinguish them
addPrefix = true
}
return kubelib.FollowLogs(
ctx,
tr.kubeClient,
&kubelib.FollowLogsParams{
Namespace: job.Namespace,
Selector: selector,
Container: containerNameRunner,
AddPrefix: addPrefix,
Output: os.Stderr,
},
)
}