func RepeatJobWithPod()

in contrib/utils/utils.go [43:105]


func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, target string, internal time.Duration) {
	infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
	warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")

	infoLogger.LogKV("msg", "repeat to create job with 3k pods")

	data, err := manifests.FS.ReadFile(target)
	if err != nil {
		panic(fmt.Errorf("unexpected error when read %s from embed memory: %v",
			target, err))
	}

	jobFile, cleanup, err := CreateTempFileWithContent(data)
	if err != nil {
		panic(fmt.Errorf("unexpected error when create job yaml: %v", err))
	}
	defer func() { _ = cleanup() }()

	kr := NewKubectlRunner(kubeCfgPath, namespace)

	infoLogger.LogKV("msg", "creating namespace", "name", namespace)
	err = kr.CreateNamespace(ctx, 5*time.Minute, namespace)
	if err != nil {
		panic(fmt.Errorf("failed to create a new namespace %s: %v", namespace, err))
	}

	defer func() {
		infoLogger.LogKV("msg", "cleanup namespace", "name", namespace)
		err := kr.DeleteNamespace(context.TODO(), 60*time.Minute, namespace)
		if err != nil {
			warnLogger.LogKV("msg", "failed to cleanup namespace", "name", namespace, "error", err)
		}
	}()

	retryInterval := 5 * time.Second
	for {
		select {
		case <-ctx.Done():
			infoLogger.LogKV("msg", "stop creating job")
			return
		default:
		}

		time.Sleep(retryInterval)

		aerr := kr.Apply(ctx, 5*time.Minute, jobFile)
		if aerr != nil {
			warnLogger.LogKV("msg", "failed to apply job, retry after 5 seconds", "job", target, "error", aerr)
			continue
		}

		werr := kr.Wait(ctx, 15*time.Minute, "condition=complete", "15m", "job/batchjobs")
		if werr != nil {
			warnLogger.LogKV("msg", "failed to wait job finish", "job", target, "error", werr)
		}

		derr := kr.Delete(ctx, 5*time.Minute, jobFile)
		if derr != nil {
			warnLogger.LogKV("msg", "failed to delete job", "job", target, "error", derr)
		}
		time.Sleep(internal)
	}
}