func()

in runner/group/handler.go [235:311]


func (h *Handler) Wait(ctx context.Context) error {
	cli := h.clientset.BatchV1().Jobs(h.namespace)

	job, err := cli.Get(ctx, h.name, metav1.GetOptions{})
	if err != nil {
		return fmt.Errorf("failed to get job %s from namespace %s: %w",
			h.name, h.namespace, err)
	}

	if jobFinished(job) {
		return nil
	}

	// NOTE: It's to align with client-go package. Please check out the
	// following reference for detail.
	//
	// https://github.com/kubernetes/client-go/blob/v0.28.4/tools/cache/reflector.go#L219
	//
	// TODO(weifu): fix staticcheck check
	//
	//nolint:staticcheck
	backoff := wait.NewExponentialBackoffManager(
		800*time.Millisecond, 30*time.Second, 2*time.Minute,
		2.0, 1.0, &clock.RealClock{})

	lastRv := job.ResourceVersion
	fieldSelector := fields.OneTermEqualSelector(metav1.ObjectNameField, h.name).String()

	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
		}

		opts := metav1.ListOptions{
			FieldSelector:       fieldSelector,
			ResourceVersion:     lastRv,
			AllowWatchBookmarks: true,
		}

		w, err := cli.Watch(ctx, opts)
		if err != nil {
			// should retry if apiserver is down or unavailable.
			if utilnet.IsConnectionRefused(err) ||
				apierrors.IsTooManyRequests(err) ||
				apierrors.IsInternalError(err) {

				<-backoff.Backoff().C()

				continue
			}

			return fmt.Errorf("failed to initialize watch for job %s: %w", h.name, err)
		}

		err = h.waitForJob(ctx, w, &lastRv)
		if err != nil {
			switch {
			case apierrors.IsResourceExpired(err) || apierrors.IsGone(err):
				klog.V(2).Infof("reset last seen revision and continue, since receive: %v", err)
				lastRv = ""
				continue
			// should retry if apiserver is down or unavailable.
			case apierrors.IsTooManyRequests(err) || apierrors.IsInternalError(err):
				<-backoff.Backoff().C()
				continue
			case errors.Is(err, errRetryable):
				<-backoff.Backoff().C()
				continue
			default:
				return err
			}
		}
		return nil
	}
}