func()

in pulsar/internal/connection_reader.go [64:109]


func (r *connectionReader) readSingleCommand() (cmd *pb.BaseCommand, headersAndPayload Buffer, err error) {
	// First, we need to read the frame size
	if r.buffer.ReadableBytes() < 4 {
		if r.buffer.ReadableBytes() == 0 {
			// If the buffer is empty, just go back to write at the beginning
			r.buffer.Clear()
		}
		if err := r.readAtLeast(4); err != nil {
			return nil, nil, fmt.Errorf("unable to read frame size: %+v", err)
		}
	}

	// We have enough to read frame size
	frameSize := r.buffer.ReadUint32()
	maxFrameSize := r.cnx.maxMessageSize + MessageFramePadding
	if r.cnx.maxMessageSize != 0 && int32(frameSize) > maxFrameSize {
		frameSizeError := fmt.Errorf("received too big frame size=%d maxFrameSize=%d", frameSize, maxFrameSize)
		r.cnx.log.Error(frameSizeError)
		r.cnx.Close()
		return nil, nil, frameSizeError
	}

	// Next, we read the rest of the frame
	if r.buffer.ReadableBytes() < frameSize {
		remainingBytes := frameSize - r.buffer.ReadableBytes()
		if err := r.readAtLeast(remainingBytes); err != nil {
			return nil, nil,
				fmt.Errorf("unable to read frame: %+v", err)
		}
	}

	// We have now the complete frame
	cmdSize := r.buffer.ReadUint32()
	cmd, err = r.deserializeCmd(r.buffer.Read(cmdSize))
	if err != nil {
		return nil, nil, err
	}

	// Also read the eventual payload
	headersAndPayloadSize := frameSize - (cmdSize + 4)
	if cmdSize+4 < frameSize {
		headersAndPayload = NewBuffer(int(headersAndPayloadSize))
		headersAndPayload.Write(r.buffer.Read(headersAndPayloadSize))
	}
	return cmd, headersAndPayload, nil
}