in pulsar/internal/connection_reader.go [64:109]
func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload Buffer, err error) {
// First, we need to read the frame size
if r.buffer.ReadableBytes() < 4 {
if r.buffer.ReadableBytes() == 0 {
// If the buffer is empty, just go back to write at the beginning
r.buffer.Clear()
}
if err := r.readAtLeast(4); err != nil {
return nil, nil, fmt.Errorf("unable to read frame size: %+v", err)
}
}
// We have enough to read frame size
frameSize := r.buffer.ReadUint32()
maxFrameSize := r.cnx.maxMessageSize + MessageFramePadding
if r.cnx.maxMessageSize != 0 && int32(frameSize) > maxFrameSize {
frameSizeError := fmt.Errorf("received too big frame size=%d maxFrameSize=%d", frameSize, maxFrameSize)
r.cnx.log.Error(frameSizeError)
r.cnx.Close()
return nil, nil, frameSizeError
}
// Next, we read the rest of the frame
if r.buffer.ReadableBytes() < frameSize {
remainingBytes := frameSize - r.buffer.ReadableBytes()
if err := r.readAtLeast(remainingBytes); err != nil {
return nil, nil,
fmt.Errorf("unable to read frame: %+v", err)
}
}
// We have now the complete frame
cmdSize := r.buffer.ReadUint32()
cmd, err = r.deserializeCmd(r.buffer.Read(cmdSize))
if err != nil {
return nil, nil, err
}
// Also read the eventual payload
headersAndPayloadSize := frameSize - (cmdSize + 4)
if cmdSize+4 < frameSize {
headersAndPayload = NewBuffer(int(headersAndPayloadSize))
headersAndPayload.Write(r.buffer.Read(headersAndPayloadSize))
}
return cmd, headersAndPayload, nil
}