func()

in conn.go [1180:1335]


func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer, startupCompleted bool) (*framer, error) {
	if ctxErr := ctx.Err(); ctxErr != nil {
		return nil, ctxErr
	}

	// TODO: move tracer onto conn
	stream, ok := c.streams.GetStream()
	if !ok {
		return nil, ErrNoStreams
	}

	// resp is basically a waiting semaphore protecting the framer
	framer := newFramer(c.compressor, c.version)

	call := &callReq{
		timeout:  make(chan struct{}),
		streamID: stream,
		resp:     make(chan callResp),
	}

	if c.streamObserver != nil {
		call.streamObserverContext = c.streamObserver.StreamContext(ctx)
	}

	if err := c.addCall(call); err != nil {
		return nil, err
	}

	// After this point, we need to either read from call.resp or close(call.timeout)
	// since closeWithError can try to write a connection close error to call.resp.
	// If we don't close(call.timeout) or read from call.resp, closeWithError can deadlock.

	if tracer != nil {
		framer.trace()
	}

	if call.streamObserverContext != nil {
		call.streamObserverContext.StreamStarted(ObservedStream{
			Host: c.host,
		})
	}

	err := req.buildFrame(framer, stream)
	if err != nil {
		// closeWithError will block waiting for this stream to either receive a response
		// or for us to timeout.
		close(call.timeout)
		// We failed to serialize the frame into a buffer.
		// This should not affect the connection as we didn't write anything. We just free the current call.
		c.mu.Lock()
		if !c.closed {
			delete(c.calls, call.streamID)
		}
		c.mu.Unlock()
		// We need to release the stream after we remove the call from c.calls, otherwise the existingCall != nil
		// check above could fail.
		c.releaseStream(call)
		return nil, err
	}

	var n int

	if c.version > protoVersion4 && startupCompleted {
		err = framer.prepareModernLayout()
	}
	if err == nil {
		n, err = c.w.writeContext(ctx, framer.buf)
	}
	if err != nil {
		// closeWithError will block waiting for this stream to either receive a response
		// or for us to timeout, close the timeout chan here. Im not entirely sure
		// but we should not get a response after an error on the write side.
		close(call.timeout)
		if (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) && n == 0 {
			// We have not started to write this frame.
			// Release the stream as no response can come from the server on the stream.
			c.mu.Lock()
			if !c.closed {
				delete(c.calls, call.streamID)
			}
			c.mu.Unlock()
			// We need to release the stream after we remove the call from c.calls, otherwise the existingCall != nil
			// check above could fail.
			c.releaseStream(call)
		} else {
			// I think this is the correct thing to do, im not entirely sure. It is not
			// ideal as readers might still get some data, but they probably wont.
			// Here we need to be careful as the stream is not available and if all
			// writes just timeout or fail then the pool might use this connection to
			// send a frame on, with all the streams used up and not returned.
			c.closeWithError(err)
		}
		return nil, err
	}

	var timeoutCh <-chan time.Time
	if timeout := c.r.GetTimeout(); timeout > 0 {
		if call.timer == nil {
			call.timer = time.NewTimer(0)
			<-call.timer.C
		} else {
			if !call.timer.Stop() {
				select {
				case <-call.timer.C:
				default:
				}
			}
		}

		call.timer.Reset(timeout)
		timeoutCh = call.timer.C
	}

	var ctxDone <-chan struct{}
	if ctx != nil {
		ctxDone = ctx.Done()
	}

	select {
	case resp := <-call.resp:
		close(call.timeout)
		if resp.err != nil {
			if !c.Closed() {
				// if the connection is closed then we cant release the stream,
				// this is because the request is still outstanding and we have
				// been handed another error from another stream which caused the
				// connection to close.
				c.releaseStream(call)
			}
			return nil, resp.err
		}
		// dont release the stream if detect a timeout as another request can reuse
		// that stream and get a response for the old request, which we have no
		// easy way of detecting.
		//
		// Ensure that the stream is not released if there are potentially outstanding
		// requests on the stream to prevent nil pointer dereferences in recv().
		defer c.releaseStream(call)

		if v := resp.framer.header.version.version(); v != c.version {
			return nil, NewErrProtocol("unexpected protocol version in response: got %d expected %d", v, c.version)
		}

		return resp.framer, nil
	case <-timeoutCh:
		close(call.timeout)
		c.handleTimeout()
		return nil, ErrTimeoutNoResponse
	case <-ctxDone:
		close(call.timeout)
		return nil, ctx.Err()
	case <-c.ctx.Done():
		close(call.timeout)
		return nil, ErrConnectionClosed
	}
}