in contrib/utils/utils.go [43:105]
func RepeatJobWithPod(ctx context.Context, kubeCfgPath string, namespace string, target string, internal time.Duration) {
infoLogger := log.GetLogger(ctx).WithKeyValues("level", "info")
warnLogger := log.GetLogger(ctx).WithKeyValues("level", "warn")
infoLogger.LogKV("msg", "repeat to create job with 3k pods")
data, err := manifests.FS.ReadFile(target)
if err != nil {
panic(fmt.Errorf("unexpected error when read %s from embed memory: %v",
target, err))
}
jobFile, cleanup, err := CreateTempFileWithContent(data)
if err != nil {
panic(fmt.Errorf("unexpected error when create job yaml: %v", err))
}
defer func() { _ = cleanup() }()
kr := NewKubectlRunner(kubeCfgPath, namespace)
infoLogger.LogKV("msg", "creating namespace", "name", namespace)
err = kr.CreateNamespace(ctx, 5*time.Minute, namespace)
if err != nil {
panic(fmt.Errorf("failed to create a new namespace %s: %v", namespace, err))
}
defer func() {
infoLogger.LogKV("msg", "cleanup namespace", "name", namespace)
err := kr.DeleteNamespace(context.TODO(), 60*time.Minute, namespace)
if err != nil {
warnLogger.LogKV("msg", "failed to cleanup namespace", "name", namespace, "error", err)
}
}()
retryInterval := 5 * time.Second
for {
select {
case <-ctx.Done():
infoLogger.LogKV("msg", "stop creating job")
return
default:
}
time.Sleep(retryInterval)
aerr := kr.Apply(ctx, 5*time.Minute, jobFile)
if aerr != nil {
warnLogger.LogKV("msg", "failed to apply job, retry after 5 seconds", "job", target, "error", aerr)
continue
}
werr := kr.Wait(ctx, 15*time.Minute, "condition=complete", "15m", "job/batchjobs")
if werr != nil {
warnLogger.LogKV("msg", "failed to wait job finish", "job", target, "error", werr)
}
derr := kr.Delete(ctx, 5*time.Minute, jobFile)
if derr != nil {
warnLogger.LogKV("msg", "failed to delete job", "job", target, "error", derr)
}
time.Sleep(internal)
}
}