message.go (253 lines of code) (raw):
package amqp
import (
"fmt"
"time"
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
)
// Message is an AMQP message.
type Message struct {
// Message format code.
//
// The upper three octets of a message format code identify a particular message
// format. The lowest octet indicates the version of said message format. Any
// given version of a format is forwards compatible with all higher versions.
Format uint32
// The DeliveryTag can be up to 32 octets of binary data.
// Note that when mode one is enabled there will be no delivery tag.
DeliveryTag []byte
// The header section carries standard delivery details about the transfer
// of a message through the AMQP network.
Header *MessageHeader
// If the header section is omitted the receiver MUST assume the appropriate
// default values (or the meaning implied by no value being set) for the
// fields within the header unless other target or node specific defaults
// have otherwise been set.
// The delivery-annotations section is used for delivery-specific non-standard
// properties at the head of the message. Delivery annotations convey information
// from the sending peer to the receiving peer.
DeliveryAnnotations Annotations
// If the recipient does not understand the annotation it cannot be acted upon
// and its effects (such as any implied propagation) cannot be acted upon.
// Annotations might be specific to one implementation, or common to multiple
// implementations. The capabilities negotiated on link attach and on the source
// and target SHOULD be used to establish which annotations a peer supports. A
// registry of defined annotations and their meanings is maintained [AMQPDELANN].
// The symbolic key "rejected" is reserved for the use of communicating error
// information regarding rejected messages. Any values associated with the
// "rejected" key MUST be of type error.
//
// If the delivery-annotations section is omitted, it is equivalent to a
// delivery-annotations section containing an empty map of annotations.
// The message-annotations section is used for properties of the message which
// are aimed at the infrastructure.
Annotations Annotations
// The message-annotations section is used for properties of the message which
// are aimed at the infrastructure and SHOULD be propagated across every
// delivery step. Message annotations convey information about the message.
// Intermediaries MUST propagate the annotations unless the annotations are
// explicitly augmented or modified (e.g., by the use of the modified outcome).
//
// The capabilities negotiated on link attach and on the source and target can
// be used to establish which annotations a peer understands; however, in a
// network of AMQP intermediaries it might not be possible to know if every
// intermediary will understand the annotation. Note that for some annotations
// it might not be necessary for the intermediary to understand their purpose,
// i.e., they could be used purely as an attribute which can be filtered on.
//
// A registry of defined annotations and their meanings is maintained [AMQPMESSANN].
//
// If the message-annotations section is omitted, it is equivalent to a
// message-annotations section containing an empty map of annotations.
// The properties section is used for a defined set of standard properties of
// the message.
Properties *MessageProperties
// The properties section is part of the bare message; therefore,
// if retransmitted by an intermediary, it MUST remain unaltered.
// The application-properties section is a part of the bare message used for
// structured application data. Intermediaries can use the data within this
// structure for the purposes of filtering or routing.
ApplicationProperties map[string]any
// The keys of this map are restricted to be of type string (which excludes
// the possibility of a null key) and the values are restricted to be of
// simple types only, that is, excluding map, list, and array types.
// NOTE: the Data, Value, and Sequence fields are mutually exclusive.
// Data payloads.
// A data section contains opaque binary data.
Data [][]byte
// Value payload.
// An amqp-value section contains a single AMQP value.
// To send an AMQP null, populate with a [Null].
Value any
// Sequence will contain AMQP sequence sections from the body of the message.
// An amqp-sequence section contains an AMQP sequence.
Sequence [][]any
// The footer section is used for details about the message or delivery which
// can only be calculated or evaluated once the whole bare message has been
// constructed or seen (for example message hashes, HMACs, signatures and
// encryption details).
Footer Annotations
deliveryID uint32 // used when sending disposition
settled bool // whether transfer was settled by sender
rcv *Receiver // used to settle message on the corresponding Receiver (nil if settled == true)
}
// NewMessage returns a *Message with data as the first payload in the Data field.
//
// This constructor is intended as a helper for basic Messages with a
// single data payload. It is valid to construct a Message directly for
// more complex usages.
//
// To create a Message using the Value or Sequence fields, don't use this
// constructor, create a new Message instead.
func NewMessage(data []byte) *Message {
return &Message{
Data: [][]byte{data},
}
}
// Null is an AMQP null.
// Typically used in [Message.Value] to send a null.
//
// https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-null
type Null struct{}
func (n Null) Marshal(wr *buffer.Buffer) error {
wr.AppendByte(byte(encoding.TypeCodeNull))
return nil
}
// GetData returns the first []byte from the Data field
// or nil if Data is empty.
func (m *Message) GetData() []byte {
if len(m.Data) < 1 {
return nil
}
return m.Data[0]
}
// MarshalBinary encodes the message into binary form.
func (m *Message) MarshalBinary() ([]byte, error) {
buf := &buffer.Buffer{}
err := m.Marshal(buf)
return buf.Detach(), err
}
func (m *Message) Marshal(wr *buffer.Buffer) error {
if m.Header != nil {
err := m.Header.Marshal(wr)
if err != nil {
return err
}
}
if m.DeliveryAnnotations != nil {
encoding.WriteDescriptor(wr, encoding.TypeCodeDeliveryAnnotations)
err := encoding.Marshal(wr, m.DeliveryAnnotations)
if err != nil {
return err
}
}
if m.Annotations != nil {
encoding.WriteDescriptor(wr, encoding.TypeCodeMessageAnnotations)
err := encoding.Marshal(wr, m.Annotations)
if err != nil {
return err
}
}
if m.Properties != nil {
err := encoding.Marshal(wr, m.Properties)
if err != nil {
return err
}
}
if m.ApplicationProperties != nil {
encoding.WriteDescriptor(wr, encoding.TypeCodeApplicationProperties)
err := encoding.Marshal(wr, m.ApplicationProperties)
if err != nil {
return err
}
}
for _, data := range m.Data {
encoding.WriteDescriptor(wr, encoding.TypeCodeApplicationData)
err := encoding.WriteBinary(wr, data)
if err != nil {
return err
}
}
if m.Value != nil {
encoding.WriteDescriptor(wr, encoding.TypeCodeAMQPValue)
err := encoding.Marshal(wr, m.Value)
if err != nil {
return err
}
}
if m.Sequence != nil {
// the body can basically be one of three different types (value, data or sequence).
// When it's sequence it's actually _several_ sequence sections, one for each sub-array.
for _, v := range m.Sequence {
encoding.WriteDescriptor(wr, encoding.TypeCodeAMQPSequence)
err := encoding.Marshal(wr, v)
if err != nil {
return err
}
}
}
if m.Footer != nil {
encoding.WriteDescriptor(wr, encoding.TypeCodeFooter)
err := encoding.Marshal(wr, m.Footer)
if err != nil {
return err
}
}
return nil
}
// UnmarshalBinary decodes the message from binary form.
func (m *Message) UnmarshalBinary(data []byte) error {
buf := buffer.New(data)
return m.Unmarshal(buf)
}
func (m *Message) Unmarshal(r *buffer.Buffer) error {
// loop, decoding sections until bytes have been consumed
for r.Len() > 0 {
// determine type
type_, headerLength, err := encoding.PeekMessageType(r.Bytes())
if err != nil {
return err
}
var (
section any
// section header is read from r before
// unmarshaling section is set to true
discardHeader = true
)
switch encoding.AMQPType(type_) {
case encoding.TypeCodeMessageHeader:
discardHeader = false
section = &m.Header
case encoding.TypeCodeDeliveryAnnotations:
section = &m.DeliveryAnnotations
case encoding.TypeCodeMessageAnnotations:
section = &m.Annotations
case encoding.TypeCodeMessageProperties:
discardHeader = false
section = &m.Properties
case encoding.TypeCodeApplicationProperties:
section = &m.ApplicationProperties
case encoding.TypeCodeApplicationData:
r.Skip(int(headerLength))
var data []byte
err = encoding.Unmarshal(r, &data)
if err != nil {
return err
}
m.Data = append(m.Data, data)
continue
case encoding.TypeCodeAMQPSequence:
r.Skip(int(headerLength))
var data []any
err = encoding.Unmarshal(r, &data)
if err != nil {
return err
}
m.Sequence = append(m.Sequence, data)
continue
case encoding.TypeCodeFooter:
section = &m.Footer
case encoding.TypeCodeAMQPValue:
section = &m.Value
default:
return fmt.Errorf("unknown message section %#02x", type_)
}
if discardHeader {
r.Skip(int(headerLength))
}
err = encoding.Unmarshal(r, section)
if err != nil {
return err
}
}
return nil
}
func (m *Message) onSettlement() {
m.settled = true
m.rcv = nil
}
/*
<type name="header" class="composite" source="list" provides="section">
<descriptor name="amqp:header:list" code="0x00000000:0x00000070"/>
<field name="durable" type="boolean" default="false"/>
<field name="priority" type="ubyte" default="4"/>
<field name="ttl" type="milliseconds"/>
<field name="first-acquirer" type="boolean" default="false"/>
<field name="delivery-count" type="uint" default="0"/>
</type>
*/
// MessageHeader carries standard delivery details about the transfer
// of a message.
type MessageHeader struct {
Durable bool
Priority uint8
TTL time.Duration // from milliseconds
FirstAcquirer bool
DeliveryCount uint32
}
func (h *MessageHeader) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeMessageHeader, []encoding.MarshalField{
{Value: &h.Durable},
{Value: &h.Priority, Omit: h.Priority == 4},
{Value: (*encoding.Milliseconds)(&h.TTL), Omit: h.TTL == 0},
{Value: &h.FirstAcquirer, Omit: !h.FirstAcquirer},
{Value: &h.DeliveryCount, Omit: h.DeliveryCount == 0},
})
}
func (h *MessageHeader) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeMessageHeader, []encoding.UnmarshalField{
{Field: &h.Durable},
{Field: &h.Priority, HandleNull: func() error { h.Priority = 4; return nil }},
{Field: (*encoding.Milliseconds)(&h.TTL)},
{Field: &h.FirstAcquirer},
{Field: &h.DeliveryCount},
}...)
}
/*
<type name="properties" class="composite" source="list" provides="section">
<descriptor name="amqp:properties:list" code="0x00000000:0x00000073"/>
<field name="message-id" type="*" requires="message-id"/>
<field name="user-id" type="binary"/>
<field name="to" type="*" requires="address"/>
<field name="subject" type="string"/>
<field name="reply-to" type="*" requires="address"/>
<field name="correlation-id" type="*" requires="message-id"/>
<field name="content-type" type="symbol"/>
<field name="content-encoding" type="symbol"/>
<field name="absolute-expiry-time" type="timestamp"/>
<field name="creation-time" type="timestamp"/>
<field name="group-id" type="string"/>
<field name="group-sequence" type="sequence-no"/>
<field name="reply-to-group-id" type="string"/>
</type>
*/
// MessageProperties is the defined set of properties for AMQP messages.
type MessageProperties struct {
// Message-id, if set, uniquely identifies a message within the message system.
// The message producer is usually responsible for setting the message-id in
// such a way that it is assured to be globally unique. A broker MAY discard a
// message as a duplicate if the value of the message-id matches that of a
// previously received message sent to the same node.
//
// The value is restricted to the following types
// - uint64, UUID, []byte, or string
MessageID any
// The identity of the user responsible for producing the message.
// The client sets this value, and it MAY be authenticated by intermediaries.
UserID []byte
// The to field identifies the node that is the intended destination of the message.
// On any given transfer this might not be the node at the receiving end of the link.
To *string
// A common field for summary information about the message content and purpose.
Subject *string
// The address of the node to send replies to.
ReplyTo *string
// This is a client-specific id that can be used to mark or identify messages
// between clients.
//
// The value is restricted to the following types
// - uint64, UUID, []byte, or string
CorrelationID any
// The RFC-2046 [RFC2046] MIME type for the message's application-data section
// (body). As per RFC-2046 [RFC2046] this can contain a charset parameter defining
// the character encoding used: e.g., 'text/plain; charset="utf-8"'.
//
// For clarity, as per section 7.2.1 of RFC-2616 [RFC2616], where the content type
// is unknown the content-type SHOULD NOT be set. This allows the recipient the
// opportunity to determine the actual type. Where the section is known to be truly
// opaque binary data, the content-type SHOULD be set to application/octet-stream.
//
// When using an application-data section with a section code other than data,
// content-type SHOULD NOT be set.
ContentType *string
// The content-encoding property is used as a modifier to the content-type.
// When present, its value indicates what additional content encodings have been
// applied to the application-data, and thus what decoding mechanisms need to be
// applied in order to obtain the media-type referenced by the content-type header
// field.
//
// Content-encoding is primarily used to allow a document to be compressed without
// losing the identity of its underlying content type.
//
// Content-encodings are to be interpreted as per section 3.5 of RFC 2616 [RFC2616].
// Valid content-encodings are registered at IANA [IANAHTTPPARAMS].
//
// The content-encoding MUST NOT be set when the application-data section is other
// than data. The binary representation of all other application-data section types
// is defined completely in terms of the AMQP type system.
//
// Implementations MUST NOT use the identity encoding. Instead, implementations
// SHOULD NOT set this property. Implementations SHOULD NOT use the compress encoding,
// except as to remain compatible with messages originally sent with other protocols,
// e.g. HTTP or SMTP.
//
// Implementations SHOULD NOT specify multiple content-encoding values except as to
// be compatible with messages originally sent with other protocols, e.g. HTTP or SMTP.
ContentEncoding *string
// An absolute time when this message is considered to be expired.
AbsoluteExpiryTime *time.Time
// An absolute time when this message was created.
CreationTime *time.Time
// Identifies the group the message belongs to.
GroupID *string
// The relative position of this message within its group.
//
// The value is defined as a RFC-1982 sequence number
GroupSequence *uint32
// This is a client-specific id that is used so that client can send replies to this
// message to a specific group.
ReplyToGroupID *string
}
func (p *MessageProperties) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeMessageProperties, []encoding.MarshalField{
{Value: p.MessageID, Omit: p.MessageID == nil},
{Value: &p.UserID, Omit: len(p.UserID) == 0},
{Value: p.To, Omit: p.To == nil},
{Value: p.Subject, Omit: p.Subject == nil},
{Value: p.ReplyTo, Omit: p.ReplyTo == nil},
{Value: p.CorrelationID, Omit: p.CorrelationID == nil},
{Value: (*encoding.Symbol)(p.ContentType), Omit: p.ContentType == nil},
{Value: (*encoding.Symbol)(p.ContentEncoding), Omit: p.ContentEncoding == nil},
{Value: p.AbsoluteExpiryTime, Omit: p.AbsoluteExpiryTime == nil},
{Value: p.CreationTime, Omit: p.CreationTime == nil},
{Value: p.GroupID, Omit: p.GroupID == nil},
{Value: p.GroupSequence, Omit: p.GroupSequence == nil},
{Value: p.ReplyToGroupID, Omit: p.ReplyToGroupID == nil},
})
}
func (p *MessageProperties) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeMessageProperties, []encoding.UnmarshalField{
{Field: &p.MessageID},
{Field: &p.UserID},
{Field: &p.To},
{Field: &p.Subject},
{Field: &p.ReplyTo},
{Field: &p.CorrelationID},
{Field: &p.ContentType},
{Field: &p.ContentEncoding},
{Field: &p.AbsoluteExpiryTime},
{Field: &p.CreationTime},
{Field: &p.GroupID},
{Field: &p.GroupSequence},
{Field: &p.ReplyToGroupID},
}...)
}
// Annotations keys must be of type string, int, or int64.
//
// String keys are encoded as AMQP Symbols.
type Annotations = encoding.Annotations
// UUID is a 128 bit identifier as defined in RFC 4122.
type UUID = encoding.UUID
// Symbol is an AMQP symbolic string.
type Symbol = encoding.Symbol