internal/frames/parsing.go (112 lines of code) (raw):

package frames import ( "encoding/binary" "errors" "fmt" "math" "github.com/Azure/go-amqp/internal/buffer" "github.com/Azure/go-amqp/internal/encoding" ) const HeaderSize = 8 // Frame structure: // // header (8 bytes) // 0-3: SIZE (total size, at least 8 bytes for header, uint32) // 4: DOFF (data offset,at least 2, count of 4 bytes words, uint8) // 5: TYPE (frame type) // 0x0: AMQP // 0x1: SASL // 6-7: type dependent (channel for AMQP) // extended header (opt) // body (opt) // Header in a structure appropriate for use with binary.Read() type Header struct { // size: an unsigned 32-bit integer that MUST contain the total frame size of the frame header, // extended header, and frame body. The frame is malformed if the size is less than the size of // the frame header (8 bytes). Size uint32 // doff: gives the position of the body within the frame. The value of the data offset is an // unsigned, 8-bit integer specifying a count of 4-byte words. Due to the mandatory 8-byte // frame header, the frame is malformed if the value is less than 2. DataOffset uint8 FrameType uint8 Channel uint16 } // ParseHeader reads the header from r and returns the result. // // No validation is done. func ParseHeader(r *buffer.Buffer) (Header, error) { buf, ok := r.Next(8) if !ok { return Header{}, errors.New("invalid frameHeader") } _ = buf[7] fh := Header{ Size: binary.BigEndian.Uint32(buf[0:4]), DataOffset: buf[4], FrameType: buf[5], Channel: binary.BigEndian.Uint16(buf[6:8]), } if fh.Size < HeaderSize { return fh, fmt.Errorf("received frame header with invalid size %d", fh.Size) } if fh.DataOffset < 2 { return fh, fmt.Errorf("received frame header with invalid data offset %d", fh.DataOffset) } return fh, nil } // ParseBody reads and unmarshals an AMQP frame. func ParseBody(r *buffer.Buffer) (FrameBody, error) { payload := r.Bytes() if r.Len() < 3 || payload[0] != 0 || encoding.AMQPType(payload[1]) != encoding.TypeCodeSmallUlong { return nil, errors.New("invalid frame body header") } switch pType := encoding.AMQPType(payload[2]); pType { case encoding.TypeCodeOpen: t := new(PerformOpen) err := t.Unmarshal(r) return t, err case encoding.TypeCodeBegin: t := new(PerformBegin) err := t.Unmarshal(r) return t, err case encoding.TypeCodeAttach: t := new(PerformAttach) err := t.Unmarshal(r) return t, err case encoding.TypeCodeFlow: t := new(PerformFlow) err := t.Unmarshal(r) return t, err case encoding.TypeCodeTransfer: t := new(PerformTransfer) err := t.Unmarshal(r) return t, err case encoding.TypeCodeDisposition: t := new(PerformDisposition) err := t.Unmarshal(r) return t, err case encoding.TypeCodeDetach: t := new(PerformDetach) err := t.Unmarshal(r) return t, err case encoding.TypeCodeEnd: t := new(PerformEnd) err := t.Unmarshal(r) return t, err case encoding.TypeCodeClose: t := new(PerformClose) err := t.Unmarshal(r) return t, err case encoding.TypeCodeSASLMechanism: t := new(SASLMechanisms) err := t.Unmarshal(r) return t, err case encoding.TypeCodeSASLChallenge: t := new(SASLChallenge) err := t.Unmarshal(r) return t, err case encoding.TypeCodeSASLOutcome: t := new(SASLOutcome) err := t.Unmarshal(r) return t, err default: return nil, fmt.Errorf("unknown performative type %02x", pType) } } // Write encodes fr into buf. // split out from conn.WriteFrame for testing purposes. func Write(buf *buffer.Buffer, fr Frame) error { // write header buf.Append([]byte{ 0, 0, 0, 0, // size, overwrite later 2, // doff, see frameHeader.DataOffset comment uint8(fr.Type), // frame type }) buf.AppendUint16(fr.Channel) // channel // write AMQP frame body err := encoding.Marshal(buf, fr.Body) if err != nil { return err } // validate size if uint(buf.Len()) > math.MaxUint32 { return errors.New("frame too large") } // retrieve raw bytes bufBytes := buf.Bytes() // write correct size binary.BigEndian.PutUint32(bufBytes, uint32(len(bufBytes))) return nil }