in conn.go [1180:1335]
func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer, startupCompleted bool) (*framer, error) {
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
}
// TODO: move tracer onto conn
stream, ok := c.streams.GetStream()
if !ok {
return nil, ErrNoStreams
}
// resp is basically a waiting semaphore protecting the framer
framer := newFramer(c.compressor, c.version)
call := &callReq{
timeout: make(chan struct{}),
streamID: stream,
resp: make(chan callResp),
}
if c.streamObserver != nil {
call.streamObserverContext = c.streamObserver.StreamContext(ctx)
}
if err := c.addCall(call); err != nil {
return nil, err
}
// After this point, we need to either read from call.resp or close(call.timeout)
// since closeWithError can try to write a connection close error to call.resp.
// If we don't close(call.timeout) or read from call.resp, closeWithError can deadlock.
if tracer != nil {
framer.trace()
}
if call.streamObserverContext != nil {
call.streamObserverContext.StreamStarted(ObservedStream{
Host: c.host,
})
}
err := req.buildFrame(framer, stream)
if err != nil {
// closeWithError will block waiting for this stream to either receive a response
// or for us to timeout.
close(call.timeout)
// We failed to serialize the frame into a buffer.
// This should not affect the connection as we didn't write anything. We just free the current call.
c.mu.Lock()
if !c.closed {
delete(c.calls, call.streamID)
}
c.mu.Unlock()
// We need to release the stream after we remove the call from c.calls, otherwise the existingCall != nil
// check above could fail.
c.releaseStream(call)
return nil, err
}
var n int
if c.version > protoVersion4 && startupCompleted {
err = framer.prepareModernLayout()
}
if err == nil {
n, err = c.w.writeContext(ctx, framer.buf)
}
if err != nil {
// closeWithError will block waiting for this stream to either receive a response
// or for us to timeout, close the timeout chan here. Im not entirely sure
// but we should not get a response after an error on the write side.
close(call.timeout)
if (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) && n == 0 {
// We have not started to write this frame.
// Release the stream as no response can come from the server on the stream.
c.mu.Lock()
if !c.closed {
delete(c.calls, call.streamID)
}
c.mu.Unlock()
// We need to release the stream after we remove the call from c.calls, otherwise the existingCall != nil
// check above could fail.
c.releaseStream(call)
} else {
// I think this is the correct thing to do, im not entirely sure. It is not
// ideal as readers might still get some data, but they probably wont.
// Here we need to be careful as the stream is not available and if all
// writes just timeout or fail then the pool might use this connection to
// send a frame on, with all the streams used up and not returned.
c.closeWithError(err)
}
return nil, err
}
var timeoutCh <-chan time.Time
if timeout := c.r.GetTimeout(); timeout > 0 {
if call.timer == nil {
call.timer = time.NewTimer(0)
<-call.timer.C
} else {
if !call.timer.Stop() {
select {
case <-call.timer.C:
default:
}
}
}
call.timer.Reset(timeout)
timeoutCh = call.timer.C
}
var ctxDone <-chan struct{}
if ctx != nil {
ctxDone = ctx.Done()
}
select {
case resp := <-call.resp:
close(call.timeout)
if resp.err != nil {
if !c.Closed() {
// if the connection is closed then we cant release the stream,
// this is because the request is still outstanding and we have
// been handed another error from another stream which caused the
// connection to close.
c.releaseStream(call)
}
return nil, resp.err
}
// dont release the stream if detect a timeout as another request can reuse
// that stream and get a response for the old request, which we have no
// easy way of detecting.
//
// Ensure that the stream is not released if there are potentially outstanding
// requests on the stream to prevent nil pointer dereferences in recv().
defer c.releaseStream(call)
if v := resp.framer.header.version.version(); v != c.version {
return nil, NewErrProtocol("unexpected protocol version in response: got %d expected %d", v, c.version)
}
return resp.framer, nil
case <-timeoutCh:
close(call.timeout)
c.handleTimeout()
return nil, ErrTimeoutNoResponse
case <-ctxDone:
close(call.timeout)
return nil, ctx.Err()
case <-c.ctx.Done():
close(call.timeout)
return nil, ErrConnectionClosed
}
}