internal/task/run.go (188 lines of code) (raw):
package task
import (
"context"
"fmt"
"path/filepath"
"github.com/sourcegraph/conc/iter"
k8sbatchv1 "k8s.io/api/batch/v1"
k8scorev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
k8smetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"github.com/Azure/k6ctl/internal/config"
"github.com/Azure/k6ctl/internal/target"
)
func RunTask(
ctx context.Context,
target target.Target,
getConfigProviderByName config.GetConfigProviderByName,
taskConfig *Schema,
sourceBaseDir string,
script string,
options ...RunTaskOption,
) error {
opt := defaultRunTaskOption()
for _, o := range options {
if err := o.apply(opt); err != nil {
return err
}
}
if s, err := filepath.Abs(filepath.Clean(sourceBaseDir)); err != nil {
return fmt.Errorf("invalid source base dir %q: %w", sourceBaseDir, err)
} else {
sourceBaseDir = s
}
kubeconfig, ok := target.GetKubeconfig()
if !ok {
return fmt.Errorf("target does not have kubeconfig")
}
kubeClient, err := opt.KubeClientFactory(kubeconfig)
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %w", err)
}
tr := &taskRunner{
target: target,
kubeClient: kubeClient,
instances: opt.Instances,
followLogs: opt.FollowLogs,
getConfigProviderByName: getConfigProviderByName,
taskConfig: taskConfig,
sourceBaseDir: sourceBaseDir,
script: script,
}
return tr.Run(ctx)
}
type taskRunner struct {
target target.Target
kubeClient kubernetes.Interface
followLogs bool
instances int32
getConfigProviderByName config.GetConfigProviderByName
taskConfig *Schema
sourceBaseDir string
script string
}
type createOrUpdateClient[T any] interface {
Create(ctx context.Context, obj T, opts k8smetav1.CreateOptions) (T, error)
Update(ctx context.Context, obj T, opts k8smetav1.UpdateOptions) (T, error)
}
func createOrUpdateObject[T any](
ctx context.Context,
client createOrUpdateClient[T],
obj T,
) (T, error) {
objCreated, err := client.Create(ctx, obj, k8smetav1.CreateOptions{})
switch {
case err == nil:
return objCreated, nil
case k8serrors.IsAlreadyExists(err):
// update
default:
var empty T
return empty, err
}
return client.Update(ctx, obj, k8smetav1.UpdateOptions{})
}
func (tr *taskRunner) Run(ctx context.Context) error {
var (
secretsToCreate []*k8scorev1.Secret
configMapsToCreate []*k8scorev1.ConfigMap
jobsToCreate []*k8sbatchv1.Job
)
if len(tr.taskConfig.Configs) > 0 {
configs, err := tr.resolveConfigs(ctx, tr.taskConfig.Configs)
if err != nil {
return fmt.Errorf("failed to resolve configs: %w", err)
}
configsSecretObject, err := tr.buildConfigSecretObject(ctx, configs)
if err != nil {
return fmt.Errorf("failed to build config secret object: %w", err)
}
secretsToCreate = append(secretsToCreate, configsSecretObject)
}
if len(tr.taskConfig.Files) > 0 {
scriptsConfigMapObject, err := tr.buildScriptsConfigMapObject(ctx)
if err != nil {
return fmt.Errorf("failed to build scripts config map object: %w", err)
}
configMapsToCreate = append(configMapsToCreate, scriptsConfigMapObject)
}
jobObject, err := tr.buildJobObject()
if err != nil {
return err
}
jobsToCreate = append(jobsToCreate, jobObject)
secretsClient := tr.kubeClient.CoreV1().Secrets(tr.objectNamespace())
configMapsClient := tr.kubeClient.CoreV1().ConfigMaps(tr.objectNamespace())
jobsClient := tr.kubeClient.BatchV1().Jobs(tr.objectNamespace())
for _, secret := range secretsToCreate {
_, err := createOrUpdateObject(ctx, secretsClient, secret)
if err != nil {
return fmt.Errorf("failed to create secret %q: %w", secret.Name, err)
}
}
for _, configMap := range configMapsToCreate {
_, err := createOrUpdateObject(ctx, configMapsClient, configMap)
if err != nil {
return fmt.Errorf("failed to create config map %q: %w", configMap.Name, err)
}
}
for _, job := range jobsToCreate {
_, err := createOrUpdateObject(ctx, jobsClient, job)
if err != nil {
return fmt.Errorf("failed to create job %q: %w", job.Name, err)
}
}
if tr.followLogs {
return tr.followJobLogs(ctx, jobObject)
}
return nil
}
func (tr *taskRunner) taskJobName() string {
return fmt.Sprintf("k6ctl-job-%s", tr.taskConfig.Name)
}
func (tr *taskRunner) configsSecretName() string {
return fmt.Sprintf("k6ctl-configs-secret-%s", tr.taskConfig.Name)
}
func (tr *taskRunner) scriptsConfigMapName() string {
return fmt.Sprintf("k6ctl-scripts-config-%s", tr.taskConfig.Name)
}
func (tr *taskRunner) objectNamespace() string {
return tr.taskConfig.K6.Namespace
}
func isChildPath(basePath string, childPath string) bool { // TODO: revisit this implementation
relPath, err := filepath.Rel(basePath, childPath)
if err != nil {
return false
}
return relPath != ".." && !filepath.IsAbs(relPath)
}
type resolvedConfig struct {
Value string
Env string
}
func (tr *taskRunner) resolveConfigs(
ctx context.Context,
configProviders []ConfigProvider,
) ([]resolvedConfig, error) {
return iter.MapErr(configProviders, func(cp *ConfigProvider) (resolvedConfig, error) {
return tr.resolveConfig(ctx, *cp)
})
}
func (tr *taskRunner) resolveConfig(
ctx context.Context,
configProvider ConfigProvider,
) (resolvedConfig, error) {
p, ok := tr.getConfigProviderByName(configProvider.Provider.Name)
if !ok {
return resolvedConfig{}, fmt.Errorf("no config provider %q", configProvider.Provider.Name)
}
value, err := p.Resolve(ctx, tr.target, configProvider.Provider.Params)
if err != nil {
return resolvedConfig{}, fmt.Errorf("%s: failed to resolve config: %w", p.Name(), err)
}
rv := resolvedConfig{
Value: value,
Env: configProvider.Env,
}
return rv, nil
}