func()

in spark/client/retry.go [386:435]


func (r retriableExecutePlanClient) Recv() (*proto.ExecutePlanResponse, error) {
	return wrapRetriableCall(r.context, r.retryContext.retryPolicies, func(ctx2 context.Context) (*proto.ExecutePlanResponse, error) {
		resp, err := r.retryContext.stream.Recv()
		// Success, simply return the result.
		if err == nil {
			r.retryContext.lastResponseId = &resp.ResponseId
			return resp, nil
		}
		// Ignore successful closure.
		if errors.Is(err, io.EOF) {
			return nil, err
		}
		// Now we have to assume that the request has failed, and we distinguish two cases: First, we have
		// never received a result and in this case we simply execute the same request again. Second,
		// we will send a reattach request with the same operation ID and the last response ID.
		if r.retryContext.lastResponseId == nil {
			// Send the request again.
			rs, execErr := r.retryContext.client.ExecutePlan(ctx2, r.retryContext.request)
			if execErr != nil {
				return nil, execErr
			}
			switch stream := rs.(type) {
			case retriableExecutePlanClient:
				r.retryContext.stream = stream.retryContext.stream
			default:
				r.retryContext.stream = stream
			}
			return nil, err
		} else {
			// Send a reattach
			req := &proto.ReattachExecuteRequest{
				SessionId:      r.retryContext.request.SessionId,
				UserContext:    r.retryContext.request.UserContext,
				OperationId:    *r.retryContext.request.OperationId,
				LastResponseId: r.retryContext.lastResponseId,
			}
			re, execErr := r.retryContext.client.ReattachExecute(ctx2, req)
			if execErr != nil {
				return nil, execErr
			}
			switch stream := re.(type) {
			case retriableExecutePlanClient:
				r.retryContext.stream = stream.retryContext.stream
			default:
				r.retryContext.stream = stream
			}
			return nil, err
		}
	})
}