in internal/stackdiag.go [150:219]
func (ds *diagJobState) scheduleJob(typ, esSecretName, esSecretKey, resourceName string, tls bool) error {
podName := fmt.Sprintf("%s-%s-diag", resourceName, typ)
tpl, err := template.New("job").Parse(jobTemplate)
if err != nil {
return err
}
diagnosticType, svcSuffix := diagnosticTypeForApplication(typ)
buffer := new(bytes.Buffer)
data := map[string]interface{}{
"PodName": podName,
"DiagnosticImage": ds.diagnosticImage,
"Namespace": ds.ns,
"ESSecretName": esSecretName,
"ESSecretKey": esSecretKey,
"SVCName": fmt.Sprintf("%s-%s", resourceName, svcSuffix),
"Type": diagnosticType,
"TLS": tls,
"OutputDir": podOutputDir,
"MainContainerName": podMainContainerName,
}
err = tpl.Execute(buffer, data)
if err != nil {
return err
}
var pod corev1.Pod
err = yaml.Unmarshal(buffer.Bytes(), &pod)
if err != nil {
return err
}
err = ds.kubectl.CoreV1().Pods(ds.ns).Delete(context.Background(), podName, metav1.DeleteOptions{GracePeriodSeconds: ptr.To[int64](0)})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
_, err = ds.kubectl.CoreV1().Pods(ds.ns).Create(context.Background(), &pod, metav1.CreateOptions{})
if err != nil {
return err
}
var job = diagJob{
RemoteSource: extraction.RemoteSource{
Namespace: ds.ns,
PodName: podName,
Typ: typ,
ResourceName: resourceName,
PodOutputDir: podOutputDir,
},
done: make(chan struct{}, 1),
}
// start a dedicated timer for each job and terminate the job when the timer expires.
go func(j *diagJob) {
timerChan := j.StartTimer(ds.jobTimeout)
select {
case <-timerChan:
logger.Printf("Diagnostic job for %s %s/%s timed out, terminating", j.Typ, j.Namespace, j.ResourceName)
if err = ds.terminateJob(context.Background(), j); err != nil {
logger.Printf("while terminating job %s", err.Error())
}
case <-j.done:
// we use separate done signal here to avoid building up lots of go routines that are only terminated by
// the overall termination of the program if a job does not exceed its timeout.
}
}(&job)
ds.jobs[podName] = &job
return nil
}