in runner/group/handler.go [235:311]
func (h *Handler) Wait(ctx context.Context) error {
cli := h.clientset.BatchV1().Jobs(h.namespace)
job, err := cli.Get(ctx, h.name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get job %s from namespace %s: %w",
h.name, h.namespace, err)
}
if jobFinished(job) {
return nil
}
// NOTE: It's to align with client-go package. Please check out the
// following reference for detail.
//
// https://github.com/kubernetes/client-go/blob/v0.28.4/tools/cache/reflector.go#L219
//
// TODO(weifu): fix staticcheck check
//
//nolint:staticcheck
backoff := wait.NewExponentialBackoffManager(
800*time.Millisecond, 30*time.Second, 2*time.Minute,
2.0, 1.0, &clock.RealClock{})
lastRv := job.ResourceVersion
fieldSelector := fields.OneTermEqualSelector(metav1.ObjectNameField, h.name).String()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
opts := metav1.ListOptions{
FieldSelector: fieldSelector,
ResourceVersion: lastRv,
AllowWatchBookmarks: true,
}
w, err := cli.Watch(ctx, opts)
if err != nil {
// should retry if apiserver is down or unavailable.
if utilnet.IsConnectionRefused(err) ||
apierrors.IsTooManyRequests(err) ||
apierrors.IsInternalError(err) {
<-backoff.Backoff().C()
continue
}
return fmt.Errorf("failed to initialize watch for job %s: %w", h.name, err)
}
err = h.waitForJob(ctx, w, &lastRv)
if err != nil {
switch {
case apierrors.IsResourceExpired(err) || apierrors.IsGone(err):
klog.V(2).Infof("reset last seen revision and continue, since receive: %v", err)
lastRv = ""
continue
// should retry if apiserver is down or unavailable.
case apierrors.IsTooManyRequests(err) || apierrors.IsInternalError(err):
<-backoff.Backoff().C()
continue
case errors.Is(err, errRetryable):
<-backoff.Backoff().C()
continue
default:
return err
}
}
return nil
}
}