in conn.go [643:737]
func (c *Conn) processFrame(ctx context.Context, r io.Reader) error {
// not safe for concurrent reads
// read a full header, ignore timeouts, as this is being ran in a loop
// TODO: TCP level deadlines? or just query level deadlines?
if c.r.GetTimeout() > 0 {
c.r.SetReadDeadline(time.Time{})
}
headStartTime := time.Now()
// were just reading headers over and over and copy bodies
head, err := readHeader(r, c.headerBuf[:])
headEndTime := time.Now()
if err != nil {
return err
}
if c.frameObserver != nil {
c.frameObserver.ObserveFrameHeader(context.Background(), ObservedFrameHeader{
Version: protoVersion(head.version),
Flags: head.flags,
Stream: int16(head.stream),
Opcode: frameOp(head.op),
Length: int32(head.length),
Start: headStartTime,
End: headEndTime,
Host: c.host,
})
}
if head.stream > c.streams.NumStreams {
return fmt.Errorf("gocql: frame header stream is beyond call expected bounds: %d", head.stream)
} else if head.stream == -1 {
// TODO: handle cassandra event frames, we shouldnt get any currently
framer := newFramer(c.compressor, c.version)
if err := framer.readFrame(r, &head); err != nil {
return err
}
go c.session.handleEvent(framer)
return nil
} else if head.stream <= 0 {
// reserved stream that we dont use, probably due to a protocol error
// or a bug in Cassandra, this should be an error, parse it and return.
framer := newFramer(c.compressor, c.version)
if err := framer.readFrame(r, &head); err != nil {
return err
}
frame, err := framer.parseFrame()
if err != nil {
return err
}
return &protocolError{
frame: frame,
}
}
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return ErrConnectionClosed
}
call, ok := c.calls[head.stream]
delete(c.calls, head.stream)
c.mu.Unlock()
if call == nil || !ok {
c.logger.Printf("gocql: received response for stream which has no handler: header=%v\n", head)
return c.discardFrame(r, head)
} else if head.stream != call.streamID {
panic(fmt.Sprintf("call has incorrect streamID: got %d expected %d", call.streamID, head.stream))
}
framer := newFramer(c.compressor, c.version)
err = framer.readFrame(r, &head)
if err != nil {
// only net errors should cause the connection to be closed. Though
// cassandra returning corrupt frames will be returned here as well.
if _, ok := err.(net.Error); ok {
return err
}
}
// we either, return a response to the caller, the caller timedout, or the
// connection has closed. Either way we should never block indefinatly here
select {
case call.resp <- callResp{framer: framer, err: err}:
case <-call.timeout:
c.releaseStream(call)
case <-ctx.Done():
}
return nil
}