func()

in spark/client/retry.go [206:250]


func (r *retriableSparkConnectClient) ExecutePlan(ctx context.Context, in *proto.ExecutePlanRequest,
	opts ...grpc.CallOption,
) (proto.SparkConnectService_ExecutePlanClient, error) {
	var lastErr error
	// Create the retry state for this wrapped call. The retry state captures the information about
	// the wait time and how many retries to perform.
	state := retryState{}
	// As long as the error is retriable, we will retry the operation.
	canRetry := true
	for canRetry {
		// Every loop iteration starts with being non-retriable.
		canRetry = false
		response, lastErr := r.client.ExecutePlan(ctx, in, opts...)
		if lastErr != nil {
			for _, h := range r.retryPolicies {
				if h.Handler(lastErr) {
					canRetry = true
					wait := state.nextAttempt(h)
					if wait != nil {
						time.Sleep(*wait)
					} else {
						// If the retries are exceeded, simply return from here.
						return nil, sparkerrors.WithType(lastErr, sparkerrors.RetriesExceeded)
					}
					// Breaks out of the retry handler loop.
					break
				}
			}
		} else {
			// Exit loop if no error has been received.
			rc := retriableExecutePlanClient{
				context: ctx,
				retryContext: &retryContext{
					stream:         response,
					client:         r,
					request:        in,
					resultComplete: false,
					retryPolicies:  r.retryPolicies,
				},
			}
			return rc, nil
		}
	}
	return nil, sparkerrors.WithType(lastErr, sparkerrors.RetriesExceeded)
}