in spark/client/retry.go [386:435]
func (r retriableExecutePlanClient) Recv() (*proto.ExecutePlanResponse, error) {
return wrapRetriableCall(r.context, r.retryContext.retryPolicies, func(ctx2 context.Context) (*proto.ExecutePlanResponse, error) {
resp, err := r.retryContext.stream.Recv()
// Success, simply return the result.
if err == nil {
r.retryContext.lastResponseId = &resp.ResponseId
return resp, nil
}
// Ignore successful closure.
if errors.Is(err, io.EOF) {
return nil, err
}
// Now we have to assume that the request has failed, and we distinguish two cases: First, we have
// never received a result and in this case we simply execute the same request again. Second,
// we will send a reattach request with the same operation ID and the last response ID.
if r.retryContext.lastResponseId == nil {
// Send the request again.
rs, execErr := r.retryContext.client.ExecutePlan(ctx2, r.retryContext.request)
if execErr != nil {
return nil, execErr
}
switch stream := rs.(type) {
case retriableExecutePlanClient:
r.retryContext.stream = stream.retryContext.stream
default:
r.retryContext.stream = stream
}
return nil, err
} else {
// Send a reattach
req := &proto.ReattachExecuteRequest{
SessionId: r.retryContext.request.SessionId,
UserContext: r.retryContext.request.UserContext,
OperationId: *r.retryContext.request.OperationId,
LastResponseId: r.retryContext.lastResponseId,
}
re, execErr := r.retryContext.client.ReattachExecute(ctx2, req)
if execErr != nil {
return nil, execErr
}
switch stream := re.(type) {
case retriableExecutePlanClient:
r.retryContext.stream = stream.retryContext.stream
default:
r.retryContext.stream = stream
}
return nil, err
}
})
}