in conn.go [687:766]
func (c *Conn) readFrame() (frames.Frame, error) {
switch {
// Cheaply reuse free buffer space when fully read.
case c.rxBuf.Len() == 0:
c.rxBuf.Reset()
// Prevent excessive/unbounded growth by shifting data to beginning of buffer.
case int64(c.rxBuf.Size()) > int64(c.maxFrameSize):
c.rxBuf.Reclaim()
}
var (
currentHeader frames.Header // keep track of the current header, for frames split across multiple TCP packets
frameInProgress bool // true if in the middle of receiving data for currentHeader
)
for {
// need to read more if buf doesn't contain the complete frame
// or there's not enough in buf to parse the header
if frameInProgress || c.rxBuf.Len() < frames.HeaderSize {
// we MUST reset the idle timeout before each read from net.Conn
if c.idleTimeout > 0 {
_ = c.net.SetReadDeadline(time.Now().Add(c.idleTimeout))
}
err := c.rxBuf.ReadFromOnce(c.net)
if err != nil {
return frames.Frame{}, err
}
}
// parse the header if a frame isn't in progress
if !frameInProgress {
// read more if buf doesn't contain enough to parse the header
// NOTE: we MUST do this ONLY if a frame isn't in progress else we can
// end up stalling when reading frames with bodies smaller than HeaderSize
if c.rxBuf.Len() < frames.HeaderSize {
continue
}
var err error
currentHeader, err = frames.ParseHeader(&c.rxBuf)
if err != nil {
return frames.Frame{}, err
}
frameInProgress = true
}
// check size is reasonable
if currentHeader.Size > math.MaxInt32 { // make max size configurable
return frames.Frame{}, errors.New("payload too large")
}
bodySize := int64(currentHeader.Size - frames.HeaderSize)
// the full frame hasn't been received, keep reading
if int64(c.rxBuf.Len()) < bodySize {
continue
}
frameInProgress = false
// check if body is empty (keepalive)
if bodySize == 0 {
debug.Log(3, "RX (connReader %p): received keep-alive frame", c)
continue
}
// parse the frame
b, ok := c.rxBuf.Next(bodySize)
if !ok {
return frames.Frame{}, fmt.Errorf("buffer EOF; requested bytes: %d, actual size: %d", bodySize, c.rxBuf.Len())
}
parsedBody, err := frames.ParseBody(buffer.New(b))
if err != nil {
return frames.Frame{}, err
}
return frames.Frame{Channel: currentHeader.Channel, Body: parsedBody}, nil
}
}