in spark/client/retry.go [206:250]
func (r *retriableSparkConnectClient) ExecutePlan(ctx context.Context, in *proto.ExecutePlanRequest,
opts ...grpc.CallOption,
) (proto.SparkConnectService_ExecutePlanClient, error) {
var lastErr error
// Create the retry state for this wrapped call. The retry state captures the information about
// the wait time and how many retries to perform.
state := retryState{}
// As long as the error is retriable, we will retry the operation.
canRetry := true
for canRetry {
// Every loop iteration starts with being non-retriable.
canRetry = false
response, lastErr := r.client.ExecutePlan(ctx, in, opts...)
if lastErr != nil {
for _, h := range r.retryPolicies {
if h.Handler(lastErr) {
canRetry = true
wait := state.nextAttempt(h)
if wait != nil {
time.Sleep(*wait)
} else {
// If the retries are exceeded, simply return from here.
return nil, sparkerrors.WithType(lastErr, sparkerrors.RetriesExceeded)
}
// Breaks out of the retry handler loop.
break
}
}
} else {
// Exit loop if no error has been received.
rc := retriableExecutePlanClient{
context: ctx,
retryContext: &retryContext{
stream: response,
client: r,
request: in,
resultComplete: false,
retryPolicies: r.retryPolicies,
},
}
return rc, nil
}
}
return nil, sparkerrors.WithType(lastErr, sparkerrors.RetriesExceeded)
}