in runner/group/handler.go [314:354]
func (h *Handler) waitForJob(ctx context.Context, w watch.Interface, rv *string) error {
defer w.Stop()
expectedType := reflect.TypeOf(&batchv1.Job{})
for {
select {
case <-ctx.Done():
return ctx.Err()
case event, ok := <-w.ResultChan():
if !ok {
return fmt.Errorf("unexpected closed watch channel: %w", errRetryable)
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
obj := event.Object
if typ := reflect.TypeOf(obj); typ != expectedType {
klog.V(2).Infof("unexpected type: %v", typ)
continue
}
job := obj.(*batchv1.Job)
switch event.Type {
case watch.Modified:
klog.V(5).Infof("Job %s Expected %v, Failed %v, Successed: %v",
job.Name, *job.Spec.Completions, job.Status.Failed, job.Status.Succeeded)
if jobFinished(job) {
return nil
}
default:
klog.V(2).Infof("receive event type %s", event.Type)
}
*rv = job.ResourceVersion
}
}
}