internal/frames/frames.go (667 lines of code) (raw):
package frames
import (
"errors"
"fmt"
"strconv"
"time"
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
)
// Type contains the values for a frame's type.
type Type uint8
const (
TypeAMQP Type = 0x0
TypeSASL Type = 0x1
)
// String implements the fmt.Stringer interface for type Type.
func (t Type) String() string {
if t == 0 {
return "AMQP"
}
return "SASL"
}
/*
<type name="source" class="composite" source="list" provides="source">
<descriptor name="amqp:source:list" code="0x00000000:0x00000028"/>
<field name="address" type="*" requires="address"/>
<field name="durable" type="terminus-durability" default="none"/>
<field name="expiry-policy" type="terminus-expiry-policy" default="session-end"/>
<field name="timeout" type="seconds" default="0"/>
<field name="dynamic" type="boolean" default="false"/>
<field name="dynamic-node-properties" type="node-properties"/>
<field name="distribution-mode" type="symbol" requires="distribution-mode"/>
<field name="filter" type="filter-set"/>
<field name="default-outcome" type="*" requires="outcome"/>
<field name="outcomes" type="symbol" multiple="true"/>
<field name="capabilities" type="symbol" multiple="true"/>
</type>
*/
type Source struct {
// the address of the source
//
// The address of the source MUST NOT be set when sent on a attach frame sent by
// the receiving link endpoint where the dynamic flag is set to true (that is where
// the receiver is requesting the sender to create an addressable node).
//
// The address of the source MUST be set when sent on a attach frame sent by the
// sending link endpoint where the dynamic flag is set to true (that is where the
// sender has created an addressable node at the request of the receiver and is now
// communicating the address of that created node). The generated name of the address
// SHOULD include the link name and the container-id of the remote container to allow
// for ease of identification.
Address string
// indicates the durability of the terminus
//
// Indicates what state of the terminus will be retained durably: the state of durable
// messages, only existence and configuration of the terminus, or no state at all.
//
// 0: none
// 1: configuration
// 2: unsettled-state
Durable encoding.Durability
// the expiry policy of the source
//
// link-detach: The expiry timer starts when terminus is detached.
// session-end: The expiry timer starts when the most recently associated session is
// ended.
// connection-close: The expiry timer starts when most recently associated connection
// is closed.
// never: The terminus never expires.
ExpiryPolicy encoding.ExpiryPolicy
// duration that an expiring source will be retained
//
// The source starts expiring as indicated by the expiry-policy.
Timeout uint32 // seconds
// request dynamic creation of a remote node
//
// When set to true by the receiving link endpoint, this field constitutes a request
// for the sending peer to dynamically create a node at the source. In this case the
// address field MUST NOT be set.
//
// When set to true by the sending link endpoint this field indicates creation of a
// dynamically created node. In this case the address field will contain the address
// of the created node. The generated address SHOULD include the link name and other
// available information on the initiator of the request (such as the remote
// container-id) in some recognizable form for ease of traceability.
Dynamic bool
// properties of the dynamically created node
//
// If the dynamic field is not set to true this field MUST be left unset.
//
// When set by the receiving link endpoint, this field contains the desired
// properties of the node the receiver wishes to be created. When set by the
// sending link endpoint this field contains the actual properties of the
// dynamically created node. See subsection 3.5.9 for standard node properties.
// http://www.amqp.org/specification/1.0/node-properties
//
// lifetime-policy: The lifetime of a dynamically generated node.
// Definitionally, the lifetime will never be less than the lifetime
// of the link which caused its creation, however it is possible to
// extend the lifetime of dynamically created node using a lifetime
// policy. The value of this entry MUST be of a type which provides
// the lifetime-policy archetype. The following standard
// lifetime-policies are defined below: delete-on-close,
// delete-on-no-links, delete-on-no-messages or
// delete-on-no-links-or-messages.
// supported-dist-modes: The distribution modes that the node supports.
// The value of this entry MUST be one or more symbols which are valid
// distribution-modes. That is, the value MUST be of the same type as
// would be valid in a field defined with the following attributes:
// type="symbol" multiple="true" requires="distribution-mode"
DynamicNodeProperties map[encoding.Symbol]any // TODO: implement custom type with validation
// the distribution mode of the link
//
// This field MUST be set by the sending end of the link if the endpoint supports more
// than one distribution-mode. This field MAY be set by the receiving end of the link
// to indicate a preference when a node supports multiple distribution modes.
DistributionMode encoding.Symbol
// a set of predicates to filter the messages admitted onto the link
//
// The receiving endpoint sets its desired filter, the sending endpoint sets the filter
// actually in place (including any filters defaulted at the node). The receiving
// endpoint MUST check that the filter in place meets its needs and take responsibility
// for detaching if it does not.
Filter encoding.Filter
// default outcome for unsettled transfers
//
// Indicates the outcome to be used for transfers that have not reached a terminal
// state at the receiver when the transfer is settled, including when the source
// is destroyed. The value MUST be a valid outcome (e.g., released or rejected).
DefaultOutcome any
// descriptors for the outcomes that can be chosen on this link
//
// The values in this field are the symbolic descriptors of the outcomes that can
// be chosen on this link. This field MAY be empty, indicating that the default-outcome
// will be assumed for all message transfers (if the default-outcome is not set, and no
// outcomes are provided, then the accepted outcome MUST be supported by the source).
//
// When present, the values MUST be a symbolic descriptor of a valid outcome,
// e.g., "amqp:accepted:list".
Outcomes encoding.MultiSymbol
// the extension capabilities the sender supports/desires
//
// http://www.amqp.org/specification/1.0/source-capabilities
Capabilities encoding.MultiSymbol
}
func (s *Source) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSource, []encoding.MarshalField{
{Value: &s.Address, Omit: s.Address == ""},
{Value: &s.Durable, Omit: s.Durable == encoding.DurabilityNone},
{Value: &s.ExpiryPolicy, Omit: s.ExpiryPolicy == "" || s.ExpiryPolicy == encoding.ExpirySessionEnd},
{Value: &s.Timeout, Omit: s.Timeout == 0},
{Value: &s.Dynamic, Omit: !s.Dynamic},
{Value: s.DynamicNodeProperties, Omit: len(s.DynamicNodeProperties) == 0},
{Value: &s.DistributionMode, Omit: s.DistributionMode == ""},
{Value: s.Filter, Omit: len(s.Filter) == 0},
{Value: &s.DefaultOutcome, Omit: s.DefaultOutcome == nil},
{Value: &s.Outcomes, Omit: len(s.Outcomes) == 0},
{Value: &s.Capabilities, Omit: len(s.Capabilities) == 0},
})
}
func (s *Source) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSource, []encoding.UnmarshalField{
{Field: &s.Address},
{Field: &s.Durable},
{Field: &s.ExpiryPolicy, HandleNull: func() error { s.ExpiryPolicy = encoding.ExpirySessionEnd; return nil }},
{Field: &s.Timeout},
{Field: &s.Dynamic},
{Field: &s.DynamicNodeProperties},
{Field: &s.DistributionMode},
{Field: &s.Filter},
{Field: &s.DefaultOutcome},
{Field: &s.Outcomes},
{Field: &s.Capabilities},
}...)
}
func (s Source) String() string {
return fmt.Sprintf("source{Address: %s, Durable: %d, ExpiryPolicy: %s, Timeout: %d, "+
"Dynamic: %t, DynamicNodeProperties: %v, DistributionMode: %s, Filter: %v, DefaultOutcome: %v "+
"Outcomes: %v, Capabilities: %v}",
s.Address,
s.Durable,
s.ExpiryPolicy,
s.Timeout,
s.Dynamic,
s.DynamicNodeProperties,
s.DistributionMode,
s.Filter,
s.DefaultOutcome,
s.Outcomes,
s.Capabilities,
)
}
/*
<type name="target" class="composite" source="list" provides="target">
<descriptor name="amqp:target:list" code="0x00000000:0x00000029"/>
<field name="address" type="*" requires="address"/>
<field name="durable" type="terminus-durability" default="none"/>
<field name="expiry-policy" type="terminus-expiry-policy" default="session-end"/>
<field name="timeout" type="seconds" default="0"/>
<field name="dynamic" type="boolean" default="false"/>
<field name="dynamic-node-properties" type="node-properties"/>
<field name="capabilities" type="symbol" multiple="true"/>
</type>
*/
type Target struct {
// the address of the target
//
// The address of the target MUST NOT be set when sent on a attach frame sent by
// the sending link endpoint where the dynamic flag is set to true (that is where
// the sender is requesting the receiver to create an addressable node).
//
// The address of the source MUST be set when sent on a attach frame sent by the
// receiving link endpoint where the dynamic flag is set to true (that is where
// the receiver has created an addressable node at the request of the sender and
// is now communicating the address of that created node). The generated name of
// the address SHOULD include the link name and the container-id of the remote
// container to allow for ease of identification.
Address string
// indicates the durability of the terminus
//
// Indicates what state of the terminus will be retained durably: the state of durable
// messages, only existence and configuration of the terminus, or no state at all.
//
// 0: none
// 1: configuration
// 2: unsettled-state
Durable encoding.Durability
// the expiry policy of the target
//
// link-detach: The expiry timer starts when terminus is detached.
// session-end: The expiry timer starts when the most recently associated session is
// ended.
// connection-close: The expiry timer starts when most recently associated connection
// is closed.
// never: The terminus never expires.
ExpiryPolicy encoding.ExpiryPolicy
// duration that an expiring target will be retained
//
// The target starts expiring as indicated by the expiry-policy.
Timeout uint32 // seconds
// request dynamic creation of a remote node
//
// When set to true by the sending link endpoint, this field constitutes a request
// for the receiving peer to dynamically create a node at the target. In this case
// the address field MUST NOT be set.
//
// When set to true by the receiving link endpoint this field indicates creation of
// a dynamically created node. In this case the address field will contain the
// address of the created node. The generated address SHOULD include the link name
// and other available information on the initiator of the request (such as the
// remote container-id) in some recognizable form for ease of traceability.
Dynamic bool
// properties of the dynamically created node
//
// If the dynamic field is not set to true this field MUST be left unset.
//
// When set by the sending link endpoint, this field contains the desired
// properties of the node the sender wishes to be created. When set by the
// receiving link endpoint this field contains the actual properties of the
// dynamically created node. See subsection 3.5.9 for standard node properties.
// http://www.amqp.org/specification/1.0/node-properties
//
// lifetime-policy: The lifetime of a dynamically generated node.
// Definitionally, the lifetime will never be less than the lifetime
// of the link which caused its creation, however it is possible to
// extend the lifetime of dynamically created node using a lifetime
// policy. The value of this entry MUST be of a type which provides
// the lifetime-policy archetype. The following standard
// lifetime-policies are defined below: delete-on-close,
// delete-on-no-links, delete-on-no-messages or
// delete-on-no-links-or-messages.
// supported-dist-modes: The distribution modes that the node supports.
// The value of this entry MUST be one or more symbols which are valid
// distribution-modes. That is, the value MUST be of the same type as
// would be valid in a field defined with the following attributes:
// type="symbol" multiple="true" requires="distribution-mode"
DynamicNodeProperties map[encoding.Symbol]any // TODO: implement custom type with validation
// the extension capabilities the sender supports/desires
//
// http://www.amqp.org/specification/1.0/target-capabilities
Capabilities encoding.MultiSymbol
}
func (t *Target) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeTarget, []encoding.MarshalField{
{Value: &t.Address, Omit: t.Address == ""},
{Value: &t.Durable, Omit: t.Durable == encoding.DurabilityNone},
{Value: &t.ExpiryPolicy, Omit: t.ExpiryPolicy == "" || t.ExpiryPolicy == encoding.ExpirySessionEnd},
{Value: &t.Timeout, Omit: t.Timeout == 0},
{Value: &t.Dynamic, Omit: !t.Dynamic},
{Value: t.DynamicNodeProperties, Omit: len(t.DynamicNodeProperties) == 0},
{Value: &t.Capabilities, Omit: len(t.Capabilities) == 0},
})
}
func (t *Target) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeTarget, []encoding.UnmarshalField{
{Field: &t.Address},
{Field: &t.Durable},
{Field: &t.ExpiryPolicy, HandleNull: func() error { t.ExpiryPolicy = encoding.ExpirySessionEnd; return nil }},
{Field: &t.Timeout},
{Field: &t.Dynamic},
{Field: &t.DynamicNodeProperties},
{Field: &t.Capabilities},
}...)
}
func (t Target) String() string {
return fmt.Sprintf("source{Address: %s, Durable: %d, ExpiryPolicy: %s, Timeout: %d, "+
"Dynamic: %t, DynamicNodeProperties: %v, Capabilities: %v}",
t.Address,
t.Durable,
t.ExpiryPolicy,
t.Timeout,
t.Dynamic,
t.DynamicNodeProperties,
t.Capabilities,
)
}
// frame is the decoded representation of a frame
type Frame struct {
Type Type // AMQP/SASL
Channel uint16 // channel this frame is for
Body FrameBody // body of the frame
}
// String implements the fmt.Stringer interface for type Frame.
func (f Frame) String() string {
return fmt.Sprintf("Frame{Type: %s, Channel: %d, Body: %s}", f.Type, f.Channel, f.Body)
}
// frameBody adds some type safety to frame encoding
type FrameBody interface {
frameBody()
}
/*
<type name="open" class="composite" source="list" provides="frame">
<descriptor name="amqp:open:list" code="0x00000000:0x00000010"/>
<field name="container-id" type="string" mandatory="true"/>
<field name="hostname" type="string"/>
<field name="max-frame-size" type="uint" default="4294967295"/>
<field name="channel-max" type="ushort" default="65535"/>
<field name="idle-time-out" type="milliseconds"/>
<field name="outgoing-locales" type="ietf-language-tag" multiple="true"/>
<field name="incoming-locales" type="ietf-language-tag" multiple="true"/>
<field name="offered-capabilities" type="symbol" multiple="true"/>
<field name="desired-capabilities" type="symbol" multiple="true"/>
<field name="properties" type="fields"/>
</type>
*/
type PerformOpen struct {
ContainerID string // required
Hostname string
MaxFrameSize uint32 // default: 4294967295
ChannelMax uint16 // default: 65535
IdleTimeout time.Duration // from milliseconds
OutgoingLocales encoding.MultiSymbol
IncomingLocales encoding.MultiSymbol
OfferedCapabilities encoding.MultiSymbol
DesiredCapabilities encoding.MultiSymbol
Properties map[encoding.Symbol]any
}
func (o *PerformOpen) frameBody() {}
func (o *PerformOpen) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeOpen, []encoding.MarshalField{
{Value: &o.ContainerID, Omit: false},
{Value: &o.Hostname, Omit: o.Hostname == ""},
{Value: &o.MaxFrameSize, Omit: o.MaxFrameSize == 4294967295},
{Value: &o.ChannelMax, Omit: o.ChannelMax == 65535},
{Value: (*encoding.Milliseconds)(&o.IdleTimeout), Omit: o.IdleTimeout == 0},
{Value: &o.OutgoingLocales, Omit: len(o.OutgoingLocales) == 0},
{Value: &o.IncomingLocales, Omit: len(o.IncomingLocales) == 0},
{Value: &o.OfferedCapabilities, Omit: len(o.OfferedCapabilities) == 0},
{Value: &o.DesiredCapabilities, Omit: len(o.DesiredCapabilities) == 0},
{Value: o.Properties, Omit: len(o.Properties) == 0},
})
}
func (o *PerformOpen) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeOpen, []encoding.UnmarshalField{
{Field: &o.ContainerID, HandleNull: func() error { return errors.New("Open.ContainerID is required") }},
{Field: &o.Hostname},
{Field: &o.MaxFrameSize, HandleNull: func() error { o.MaxFrameSize = 4294967295; return nil }},
{Field: &o.ChannelMax, HandleNull: func() error { o.ChannelMax = 65535; return nil }},
{Field: (*encoding.Milliseconds)(&o.IdleTimeout)},
{Field: &o.OutgoingLocales},
{Field: &o.IncomingLocales},
{Field: &o.OfferedCapabilities},
{Field: &o.DesiredCapabilities},
{Field: &o.Properties},
}...)
}
func (o *PerformOpen) String() string {
return fmt.Sprintf("Open{ContainerID : %s, Hostname: %s, MaxFrameSize: %d, "+
"ChannelMax: %d, IdleTimeout: %v, "+
"OutgoingLocales: %v, IncomingLocales: %v, "+
"OfferedCapabilities: %v, DesiredCapabilities: %v, "+
"Properties: %v}",
o.ContainerID,
o.Hostname,
o.MaxFrameSize,
o.ChannelMax,
o.IdleTimeout,
o.OutgoingLocales,
o.IncomingLocales,
o.OfferedCapabilities,
o.DesiredCapabilities,
o.Properties,
)
}
/*
<type name="begin" class="composite" source="list" provides="frame">
<descriptor name="amqp:begin:list" code="0x00000000:0x00000011"/>
<field name="remote-channel" type="ushort"/>
<field name="next-outgoing-id" type="transfer-number" mandatory="true"/>
<field name="incoming-window" type="uint" mandatory="true"/>
<field name="outgoing-window" type="uint" mandatory="true"/>
<field name="handle-max" type="handle" default="4294967295"/>
<field name="offered-capabilities" type="symbol" multiple="true"/>
<field name="desired-capabilities" type="symbol" multiple="true"/>
<field name="properties" type="fields"/>
</type>
*/
type PerformBegin struct {
// the remote channel for this session
// If a session is locally initiated, the remote-channel MUST NOT be set.
// When an endpoint responds to a remotely initiated session, the remote-channel
// MUST be set to the channel on which the remote session sent the begin.
RemoteChannel *uint16
// the transfer-id of the first transfer id the sender will send
NextOutgoingID uint32 // required, sequence number http://www.ietf.org/rfc/rfc1982.txt
// the initial incoming-window of the sender
IncomingWindow uint32 // required
// the initial outgoing-window of the sender
OutgoingWindow uint32 // required
// the maximum handle value that can be used on the session
// The handle-max value is the highest handle value that can be
// used on the session. A peer MUST NOT attempt to attach a link
// using a handle value outside the range that its partner can handle.
// A peer that receives a handle outside the supported range MUST
// close the connection with the framing-error error-code.
HandleMax uint32 // default 4294967295
// the extension capabilities the sender supports
// http://www.amqp.org/specification/1.0/session-capabilities
OfferedCapabilities encoding.MultiSymbol
// the extension capabilities the sender can use if the receiver supports them
// The sender MUST NOT attempt to use any capability other than those it
// has declared in desired-capabilities field.
DesiredCapabilities encoding.MultiSymbol
// session properties
// http://www.amqp.org/specification/1.0/session-properties
Properties map[encoding.Symbol]any
}
func (b *PerformBegin) frameBody() {}
func (b *PerformBegin) String() string {
return fmt.Sprintf("Begin{RemoteChannel: %v, NextOutgoingID: %d, IncomingWindow: %d, "+
"OutgoingWindow: %d, HandleMax: %d, OfferedCapabilities: %v, DesiredCapabilities: %v, "+
"Properties: %v}",
formatUint16Ptr(b.RemoteChannel),
b.NextOutgoingID,
b.IncomingWindow,
b.OutgoingWindow,
b.HandleMax,
b.OfferedCapabilities,
b.DesiredCapabilities,
b.Properties,
)
}
func formatUint16Ptr(p *uint16) string {
if p == nil {
return "<nil>"
}
return strconv.FormatUint(uint64(*p), 10)
}
func (b *PerformBegin) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeBegin, []encoding.MarshalField{
{Value: b.RemoteChannel, Omit: b.RemoteChannel == nil},
{Value: &b.NextOutgoingID, Omit: false},
{Value: &b.IncomingWindow, Omit: false},
{Value: &b.OutgoingWindow, Omit: false},
{Value: &b.HandleMax, Omit: b.HandleMax == 4294967295},
{Value: &b.OfferedCapabilities, Omit: len(b.OfferedCapabilities) == 0},
{Value: &b.DesiredCapabilities, Omit: len(b.DesiredCapabilities) == 0},
{Value: b.Properties, Omit: b.Properties == nil},
})
}
func (b *PerformBegin) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeBegin, []encoding.UnmarshalField{
{Field: &b.RemoteChannel},
{Field: &b.NextOutgoingID, HandleNull: func() error { return errors.New("Begin.NextOutgoingID is required") }},
{Field: &b.IncomingWindow, HandleNull: func() error { return errors.New("Begin.IncomingWindow is required") }},
{Field: &b.OutgoingWindow, HandleNull: func() error { return errors.New("Begin.OutgoingWindow is required") }},
{Field: &b.HandleMax, HandleNull: func() error { b.HandleMax = 4294967295; return nil }},
{Field: &b.OfferedCapabilities},
{Field: &b.DesiredCapabilities},
{Field: &b.Properties},
}...)
}
/*
<type name="attach" class="composite" source="list" provides="frame">
<descriptor name="amqp:attach:list" code="0x00000000:0x00000012"/>
<field name="name" type="string" mandatory="true"/>
<field name="handle" type="handle" mandatory="true"/>
<field name="role" type="role" mandatory="true"/>
<field name="snd-settle-mode" type="sender-settle-mode" default="mixed"/>
<field name="rcv-settle-mode" type="receiver-settle-mode" default="first"/>
<field name="source" type="*" requires="source"/>
<field name="target" type="*" requires="target"/>
<field name="unsettled" type="map"/>
<field name="incomplete-unsettled" type="boolean" default="false"/>
<field name="initial-delivery-count" type="sequence-no"/>
<field name="max-message-size" type="ulong"/>
<field name="offered-capabilities" type="symbol" multiple="true"/>
<field name="desired-capabilities" type="symbol" multiple="true"/>
<field name="properties" type="fields"/>
</type>
*/
type PerformAttach struct {
// the name of the link
//
// This name uniquely identifies the link from the container of the source
// to the container of the target node, e.g., if the container of the source
// node is A, and the container of the target node is B, the link MAY be
// globally identified by the (ordered) tuple (A,B,<name>).
Name string // required
// the handle for the link while attached
//
// The numeric handle assigned by the the peer as a shorthand to refer to the
// link in all performatives that reference the link until the it is detached.
//
// The handle MUST NOT be used for other open links. An attempt to attach using
// a handle which is already associated with a link MUST be responded to with
// an immediate close carrying a handle-in-use session-error.
//
// To make it easier to monitor AMQP link attach frames, it is RECOMMENDED that
// implementations always assign the lowest available handle to this field.
//
// The two endpoints MAY potentially use different handles to refer to the same link.
// Link handles MAY be reused once a link is closed for both send and receive.
Handle uint32 // required
// role of the link endpoint
//
// The role being played by the peer, i.e., whether the peer is the sender or the
// receiver of messages on the link.
Role encoding.Role
// settlement policy for the sender
//
// The delivery settlement policy for the sender. When set at the receiver this
// indicates the desired value for the settlement mode at the sender. When set
// at the sender this indicates the actual settlement mode in use. The sender
// SHOULD respect the receiver's desired settlement mode if the receiver initiates
// the attach exchange and the sender supports the desired mode.
//
// 0: unsettled - The sender will send all deliveries initially unsettled to the receiver.
// 1: settled - The sender will send all deliveries settled to the receiver.
// 2: mixed - The sender MAY send a mixture of settled and unsettled deliveries to the receiver.
SenderSettleMode *encoding.SenderSettleMode
// the settlement policy of the receiver
//
// The delivery settlement policy for the receiver. When set at the sender this
// indicates the desired value for the settlement mode at the receiver.
// When set at the receiver this indicates the actual settlement mode in use.
// The receiver SHOULD respect the sender's desired settlement mode if the sender
// initiates the attach exchange and the receiver supports the desired mode.
//
// 0: first - The receiver will spontaneously settle all incoming transfers.
// 1: second - The receiver will only settle after sending the disposition to
// the sender and receiving a disposition indicating settlement of
// the delivery from the sender.
ReceiverSettleMode *encoding.ReceiverSettleMode
// the source for messages
//
// If no source is specified on an outgoing link, then there is no source currently
// attached to the link. A link with no source will never produce outgoing messages.
Source *Source
// the target for messages
//
// If no target is specified on an incoming link, then there is no target currently
// attached to the link. A link with no target will never permit incoming messages.
Target *Target
// unsettled delivery state
//
// This is used to indicate any unsettled delivery states when a suspended link is
// resumed. The map is keyed by delivery-tag with values indicating the delivery state.
// The local and remote delivery states for a given delivery-tag MUST be compared to
// resolve any in-doubt deliveries. If necessary, deliveries MAY be resent, or resumed
// based on the outcome of this comparison. See subsection 2.6.13.
//
// If the local unsettled map is too large to be encoded within a frame of the agreed
// maximum frame size then the session MAY be ended with the frame-size-too-small error.
// The endpoint SHOULD make use of the ability to send an incomplete unsettled map
// (see below) to avoid sending an error.
//
// The unsettled map MUST NOT contain null valued keys.
//
// When reattaching (as opposed to resuming), the unsettled map MUST be null.
Unsettled encoding.Unsettled
// If set to true this field indicates that the unsettled map provided is not complete.
// When the map is incomplete the recipient of the map cannot take the absence of a
// delivery tag from the map as evidence of settlement. On receipt of an incomplete
// unsettled map a sending endpoint MUST NOT send any new deliveries (i.e. deliveries
// where resume is not set to true) to its partner (and a receiving endpoint which sent
// an incomplete unsettled map MUST detach with an error on receiving a transfer which
// does not have the resume flag set to true).
//
// Note that if this flag is set to true then the endpoints MUST detach and reattach at
// least once in order to send new deliveries. This flag can be useful when there are
// too many entries in the unsettled map to fit within a single frame. An endpoint can
// attach, resume, settle, and detach until enough unsettled state has been cleared for
// an attach where this flag is set to false.
IncompleteUnsettled bool // default: false
// the sender's initial value for delivery-count
//
// This MUST NOT be null if role is sender, and it is ignored if the role is receiver.
InitialDeliveryCount uint32 // sequence number
// the maximum message size supported by the link endpoint
//
// This field indicates the maximum message size supported by the link endpoint.
// Any attempt to deliver a message larger than this results in a message-size-exceeded
// link-error. If this field is zero or unset, there is no maximum size imposed by the
// link endpoint.
MaxMessageSize uint64
// the extension capabilities the sender supports
// http://www.amqp.org/specification/1.0/link-capabilities
OfferedCapabilities encoding.MultiSymbol
// the extension capabilities the sender can use if the receiver supports them
//
// The sender MUST NOT attempt to use any capability other than those it
// has declared in desired-capabilities field.
DesiredCapabilities encoding.MultiSymbol
// link properties
// http://www.amqp.org/specification/1.0/link-properties
Properties map[encoding.Symbol]any
}
func (a *PerformAttach) frameBody() {}
func (a PerformAttach) String() string {
return fmt.Sprintf("Attach{Name: %s, Handle: %d, Role: %s, SenderSettleMode: %s, ReceiverSettleMode: %s, "+
"Source: %v, Target: %v, Unsettled: %v, IncompleteUnsettled: %t, InitialDeliveryCount: %d, MaxMessageSize: %d, "+
"OfferedCapabilities: %v, DesiredCapabilities: %v, Properties: %v}",
a.Name,
a.Handle,
a.Role,
a.SenderSettleMode,
a.ReceiverSettleMode,
a.Source,
a.Target,
a.Unsettled,
a.IncompleteUnsettled,
a.InitialDeliveryCount,
a.MaxMessageSize,
a.OfferedCapabilities,
a.DesiredCapabilities,
a.Properties,
)
}
func (a *PerformAttach) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeAttach, []encoding.MarshalField{
{Value: &a.Name, Omit: false},
{Value: &a.Handle, Omit: false},
{Value: &a.Role, Omit: false},
{Value: a.SenderSettleMode, Omit: a.SenderSettleMode == nil},
{Value: a.ReceiverSettleMode, Omit: a.ReceiverSettleMode == nil},
{Value: a.Source, Omit: a.Source == nil},
{Value: a.Target, Omit: a.Target == nil},
{Value: a.Unsettled, Omit: len(a.Unsettled) == 0},
{Value: &a.IncompleteUnsettled, Omit: !a.IncompleteUnsettled},
{Value: &a.InitialDeliveryCount, Omit: a.Role == encoding.RoleReceiver},
{Value: &a.MaxMessageSize, Omit: a.MaxMessageSize == 0},
{Value: &a.OfferedCapabilities, Omit: len(a.OfferedCapabilities) == 0},
{Value: &a.DesiredCapabilities, Omit: len(a.DesiredCapabilities) == 0},
{Value: a.Properties, Omit: len(a.Properties) == 0},
})
}
func (a *PerformAttach) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeAttach, []encoding.UnmarshalField{
{Field: &a.Name, HandleNull: func() error { return errors.New("Attach.Name is required") }},
{Field: &a.Handle, HandleNull: func() error { return errors.New("Attach.Handle is required") }},
{Field: &a.Role, HandleNull: func() error { return errors.New("Attach.Role is required") }},
{Field: &a.SenderSettleMode},
{Field: &a.ReceiverSettleMode},
{Field: &a.Source},
{Field: &a.Target},
{Field: &a.Unsettled},
{Field: &a.IncompleteUnsettled},
{Field: &a.InitialDeliveryCount},
{Field: &a.MaxMessageSize},
{Field: &a.OfferedCapabilities},
{Field: &a.DesiredCapabilities},
{Field: &a.Properties},
}...)
}
/*
<type name="flow" class="composite" source="list" provides="frame">
<descriptor name="amqp:flow:list" code="0x00000000:0x00000013"/>
<field name="next-incoming-id" type="transfer-number"/>
<field name="incoming-window" type="uint" mandatory="true"/>
<field name="next-outgoing-id" type="transfer-number" mandatory="true"/>
<field name="outgoing-window" type="uint" mandatory="true"/>
<field name="handle" type="handle"/>
<field name="delivery-count" type="sequence-no"/>
<field name="link-credit" type="uint"/>
<field name="available" type="uint"/>
<field name="drain" type="boolean" default="false"/>
<field name="echo" type="boolean" default="false"/>
<field name="properties" type="fields"/>
</type>
*/
type PerformFlow struct {
// Identifies the expected transfer-id of the next incoming transfer frame.
// This value MUST be set if the peer has received the begin frame for the
// session, and MUST NOT be set if it has not. See subsection 2.5.6 for more details.
NextIncomingID *uint32 // sequence number
// Defines the maximum number of incoming transfer frames that the endpoint
// can currently receive. See subsection 2.5.6 for more details.
IncomingWindow uint32 // required
// The transfer-id that will be assigned to the next outgoing transfer frame.
// See subsection 2.5.6 for more details.
NextOutgoingID uint32 // sequence number
// Defines the maximum number of outgoing transfer frames that the endpoint
// could potentially currently send, if it was not constrained by restrictions
// imposed by its peer's incoming-window. See subsection 2.5.6 for more details.
OutgoingWindow uint32
// If set, indicates that the flow frame carries flow state information for the local
// link endpoint associated with the given handle. If not set, the flow frame is
// carrying only information pertaining to the session endpoint.
//
// If set to a handle that is not currently associated with an attached link,
// the recipient MUST respond by ending the session with an unattached-handle
// session error.
Handle *uint32
// The delivery-count is initialized by the sender when a link endpoint is created,
// and is incremented whenever a message is sent. Only the sender MAY independently
// modify this field. The receiver's value is calculated based on the last known
// value from the sender and any subsequent messages received on the link. Note that,
// despite its name, the delivery-count is not a count but a sequence number
// initialized at an arbitrary point by the sender.
//
// When the handle field is not set, this field MUST NOT be set.
//
// When the handle identifies that the flow state is being sent from the sender link
// endpoint to receiver link endpoint this field MUST be set to the current
// delivery-count of the link endpoint.
//
// When the flow state is being sent from the receiver endpoint to the sender endpoint
// this field MUST be set to the last known value of the corresponding sending endpoint.
// In the event that the receiving link endpoint has not yet seen the initial attach
// frame from the sender this field MUST NOT be set.
DeliveryCount *uint32 // sequence number
// the current maximum number of messages that can be received
//
// The current maximum number of messages that can be handled at the receiver endpoint
// of the link. Only the receiver endpoint can independently set this value. The sender
// endpoint sets this to the last known value seen from the receiver.
// See subsection 2.6.7 for more details.
//
// When the handle field is not set, this field MUST NOT be set.
LinkCredit *uint32
// the number of available messages
//
// The number of messages awaiting credit at the link sender endpoint. Only the sender
// can independently set this value. The receiver sets this to the last known value seen
// from the sender. See subsection 2.6.7 for more details.
//
// When the handle field is not set, this field MUST NOT be set.
Available *uint32
// indicates drain mode
//
// When flow state is sent from the sender to the receiver, this field contains the
// actual drain mode of the sender. When flow state is sent from the receiver to the
// sender, this field contains the desired drain mode of the receiver.
// See subsection 2.6.7 for more details.
//
// When the handle field is not set, this field MUST NOT be set.
Drain bool
// request state from partner
//
// If set to true then the receiver SHOULD send its state at the earliest convenient
// opportunity.
//
// If set to true, and the handle field is not set, then the sender only requires
// session endpoint state to be echoed, however, the receiver MAY fulfil this requirement
// by sending a flow performative carrying link-specific state (since any such flow also
// carries session state).
//
// If a sender makes multiple requests for the same state before the receiver can reply,
// the receiver MAY send only one flow in return.
//
// Note that if a peer responds to echo requests with flows which themselves have the
// echo field set to true, an infinite loop could result if its partner adopts the same
// policy (therefore such a policy SHOULD be avoided).
Echo bool
// link state properties
// http://www.amqp.org/specification/1.0/link-state-properties
Properties map[encoding.Symbol]any
}
func (f *PerformFlow) frameBody() {}
func (f *PerformFlow) String() string {
return fmt.Sprintf("Flow{NextIncomingID: %s, IncomingWindow: %d, NextOutgoingID: %d, OutgoingWindow: %d, "+
"Handle: %s, DeliveryCount: %s, LinkCredit: %s, Available: %s, Drain: %t, Echo: %t, Properties: %+v}",
formatUint32Ptr(f.NextIncomingID),
f.IncomingWindow,
f.NextOutgoingID,
f.OutgoingWindow,
formatUint32Ptr(f.Handle),
formatUint32Ptr(f.DeliveryCount),
formatUint32Ptr(f.LinkCredit),
formatUint32Ptr(f.Available),
f.Drain,
f.Echo,
f.Properties,
)
}
func formatUint32Ptr(p *uint32) string {
if p == nil {
return "<nil>"
}
return strconv.FormatUint(uint64(*p), 10)
}
func (f *PerformFlow) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeFlow, []encoding.MarshalField{
{Value: f.NextIncomingID, Omit: f.NextIncomingID == nil},
{Value: &f.IncomingWindow, Omit: false},
{Value: &f.NextOutgoingID, Omit: false},
{Value: &f.OutgoingWindow, Omit: false},
{Value: f.Handle, Omit: f.Handle == nil},
{Value: f.DeliveryCount, Omit: f.DeliveryCount == nil},
{Value: f.LinkCredit, Omit: f.LinkCredit == nil},
{Value: f.Available, Omit: f.Available == nil},
{Value: &f.Drain, Omit: !f.Drain},
{Value: &f.Echo, Omit: !f.Echo},
{Value: f.Properties, Omit: len(f.Properties) == 0},
})
}
func (f *PerformFlow) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeFlow, []encoding.UnmarshalField{
{Field: &f.NextIncomingID},
{Field: &f.IncomingWindow, HandleNull: func() error { return errors.New("Flow.IncomingWindow is required") }},
{Field: &f.NextOutgoingID, HandleNull: func() error { return errors.New("Flow.NextOutgoingID is required") }},
{Field: &f.OutgoingWindow, HandleNull: func() error { return errors.New("Flow.OutgoingWindow is required") }},
{Field: &f.Handle},
{Field: &f.DeliveryCount},
{Field: &f.LinkCredit},
{Field: &f.Available},
{Field: &f.Drain},
{Field: &f.Echo},
{Field: &f.Properties},
}...)
}
/*
<type name="transfer" class="composite" source="list" provides="frame">
<descriptor name="amqp:transfer:list" code="0x00000000:0x00000014"/>
<field name="handle" type="handle" mandatory="true"/>
<field name="delivery-id" type="delivery-number"/>
<field name="delivery-tag" type="delivery-tag"/>
<field name="message-format" type="message-format"/>
<field name="settled" type="boolean"/>
<field name="more" type="boolean" default="false"/>
<field name="rcv-settle-mode" type="receiver-settle-mode"/>
<field name="state" type="*" requires="delivery-state"/>
<field name="resume" type="boolean" default="false"/>
<field name="aborted" type="boolean" default="false"/>
<field name="batchable" type="boolean" default="false"/>
</type>
*/
type PerformTransfer struct {
// Specifies the link on which the message is transferred.
Handle uint32 // required
// The delivery-id MUST be supplied on the first transfer of a multi-transfer
// delivery. On continuation transfers the delivery-id MAY be omitted. It is
// an error if the delivery-id on a continuation transfer differs from the
// delivery-id on the first transfer of a delivery.
DeliveryID *uint32 // sequence number
// Uniquely identifies the delivery attempt for a given message on this link.
// This field MUST be specified for the first transfer of a multi-transfer
// message and can only be omitted for continuation transfers. It is an error
// if the delivery-tag on a continuation transfer differs from the delivery-tag
// on the first transfer of a delivery.
DeliveryTag []byte // up to 32 bytes
// This field MUST be specified for the first transfer of a multi-transfer message
// and can only be omitted for continuation transfers. It is an error if the
// message-format on a continuation transfer differs from the message-format on
// the first transfer of a delivery.
//
// The upper three octets of a message format code identify a particular message
// format. The lowest octet indicates the version of said message format. Any given
// version of a format is forwards compatible with all higher versions.
MessageFormat *uint32
// If not set on the first (or only) transfer for a (multi-transfer) delivery,
// then the settled flag MUST be interpreted as being false. For subsequent
// transfers in a multi-transfer delivery if the settled flag is left unset then
// it MUST be interpreted as true if and only if the value of the settled flag on
// any of the preceding transfers was true; if no preceding transfer was sent with
// settled being true then the value when unset MUST be taken as false.
//
// If the negotiated value for snd-settle-mode at attachment is settled, then this
// field MUST be true on at least one transfer frame for a delivery (i.e., the
// delivery MUST be settled at the sender at the point the delivery has been
// completely transferred).
//
// If the negotiated value for snd-settle-mode at attachment is unsettled, then this
// field MUST be false (or unset) on every transfer frame for a delivery (unless the
// delivery is aborted).
Settled bool
// indicates that the message has more content
//
// Note that if both the more and aborted fields are set to true, the aborted flag
// takes precedence. That is, a receiver SHOULD ignore the value of the more field
// if the transfer is marked as aborted. A sender SHOULD NOT set the more flag to
// true if it also sets the aborted flag to true.
More bool
// If first, this indicates that the receiver MUST settle the delivery once it has
// arrived without waiting for the sender to settle first.
//
// If second, this indicates that the receiver MUST NOT settle until sending its
// disposition to the sender and receiving a settled disposition from the sender.
//
// If not set, this value is defaulted to the value negotiated on link attach.
//
// If the negotiated link value is first, then it is illegal to set this field
// to second.
//
// If the message is being sent settled by the sender, the value of this field
// is ignored.
//
// The (implicit or explicit) value of this field does not form part of the
// transfer state, and is not retained if a link is suspended and subsequently resumed.
//
// 0: first - The receiver will spontaneously settle all incoming transfers.
// 1: second - The receiver will only settle after sending the disposition to
// the sender and receiving a disposition indicating settlement of
// the delivery from the sender.
ReceiverSettleMode *encoding.ReceiverSettleMode
// the state of the delivery at the sender
//
// When set this informs the receiver of the state of the delivery at the sender.
// This is particularly useful when transfers of unsettled deliveries are resumed
// after resuming a link. Setting the state on the transfer can be thought of as
// being equivalent to sending a disposition immediately before the transfer
// performative, i.e., it is the state of the delivery (not the transfer) that
// existed at the point the frame was sent.
//
// Note that if the transfer performative (or an earlier disposition performative
// referring to the delivery) indicates that the delivery has attained a terminal
// state, then no future transfer or disposition sent by the sender can alter that
// terminal state.
State encoding.DeliveryState
// indicates a resumed delivery
//
// If true, the resume flag indicates that the transfer is being used to reassociate
// an unsettled delivery from a dissociated link endpoint. See subsection 2.6.13
// for more details.
//
// The receiver MUST ignore resumed deliveries that are not in its local unsettled map.
// The sender MUST NOT send resumed transfers for deliveries not in its local
// unsettled map.
//
// If a resumed delivery spans more than one transfer performative, then the resume
// flag MUST be set to true on the first transfer of the resumed delivery. For
// subsequent transfers for the same delivery the resume flag MAY be set to true,
// or MAY be omitted.
//
// In the case where the exchange of unsettled maps makes clear that all message
// data has been successfully transferred to the receiver, and that only the final
// state (and potentially settlement) at the sender needs to be conveyed, then a
// resumed delivery MAY carry no payload and instead act solely as a vehicle for
// carrying the terminal state of the delivery at the sender.
Resume bool
// indicates that the message is aborted
//
// Aborted messages SHOULD be discarded by the recipient (any payload within the
// frame carrying the performative MUST be ignored). An aborted message is
// implicitly settled.
Aborted bool
// batchable hint
//
// If true, then the issuer is hinting that there is no need for the peer to urgently
// communicate updated delivery state. This hint MAY be used to artificially increase
// the amount of batching an implementation uses when communicating delivery states,
// and thereby save bandwidth.
//
// If the message being delivered is too large to fit within a single frame, then the
// setting of batchable to true on any of the transfer performatives for the delivery
// is equivalent to setting batchable to true for all the transfer performatives for
// the delivery.
//
// The batchable value does not form part of the transfer state, and is not retained
// if a link is suspended and subsequently resumed.
Batchable bool
Payload []byte
// optional channel to indicate to sender that transfer has completed
//
// Settled=true: closed when the transferred on network.
// Settled=false: closed when the receiver has confirmed settlement.
Done chan encoding.DeliveryState
}
func (t *PerformTransfer) frameBody() {}
func (t PerformTransfer) String() string {
deliveryTag := "<nil>"
if t.DeliveryTag != nil {
deliveryTag = fmt.Sprintf("%X", t.DeliveryTag)
}
return fmt.Sprintf("Transfer{Handle: %d, DeliveryID: %s, DeliveryTag: %s, MessageFormat: %s, "+
"Settled: %t, More: %t, ReceiverSettleMode: %s, State: %v, Resume: %t, Aborted: %t, "+
"Batchable: %t, Payload [size]: %d}",
t.Handle,
formatUint32Ptr(t.DeliveryID),
deliveryTag,
formatUint32Ptr(t.MessageFormat),
t.Settled,
t.More,
t.ReceiverSettleMode,
t.State,
t.Resume,
t.Aborted,
t.Batchable,
len(t.Payload),
)
}
func (t *PerformTransfer) Marshal(wr *buffer.Buffer) error {
err := encoding.MarshalComposite(wr, encoding.TypeCodeTransfer, []encoding.MarshalField{
{Value: &t.Handle},
{Value: t.DeliveryID, Omit: t.DeliveryID == nil},
{Value: &t.DeliveryTag, Omit: len(t.DeliveryTag) == 0},
{Value: t.MessageFormat, Omit: t.MessageFormat == nil},
{Value: &t.Settled, Omit: !t.Settled},
{Value: &t.More, Omit: !t.More},
{Value: t.ReceiverSettleMode, Omit: t.ReceiverSettleMode == nil},
{Value: t.State, Omit: t.State == nil},
{Value: &t.Resume, Omit: !t.Resume},
{Value: &t.Aborted, Omit: !t.Aborted},
{Value: &t.Batchable, Omit: !t.Batchable},
})
if err != nil {
return err
}
wr.Append(t.Payload)
return nil
}
func (t *PerformTransfer) Unmarshal(r *buffer.Buffer) error {
err := encoding.UnmarshalComposite(r, encoding.TypeCodeTransfer, []encoding.UnmarshalField{
{Field: &t.Handle, HandleNull: func() error { return errors.New("Transfer.Handle is required") }},
{Field: &t.DeliveryID},
{Field: &t.DeliveryTag},
{Field: &t.MessageFormat},
{Field: &t.Settled},
{Field: &t.More},
{Field: &t.ReceiverSettleMode},
{Field: &t.State},
{Field: &t.Resume},
{Field: &t.Aborted},
{Field: &t.Batchable},
}...)
if err != nil {
return err
}
t.Payload = append([]byte(nil), r.Bytes()...)
return err
}
/*
<type name="disposition" class="composite" source="list" provides="frame">
<descriptor name="amqp:disposition:list" code="0x00000000:0x00000015"/>
<field name="role" type="role" mandatory="true"/>
<field name="first" type="delivery-number" mandatory="true"/>
<field name="last" type="delivery-number"/>
<field name="settled" type="boolean" default="false"/>
<field name="state" type="*" requires="delivery-state"/>
<field name="batchable" type="boolean" default="false"/>
</type>
*/
type PerformDisposition struct {
// directionality of disposition
//
// The role identifies whether the disposition frame contains information about
// sending link endpoints or receiving link endpoints.
Role encoding.Role
// lower bound of deliveries
//
// Identifies the lower bound of delivery-ids for the deliveries in this set.
First uint32 // required, sequence number
// upper bound of deliveries
//
// Identifies the upper bound of delivery-ids for the deliveries in this set.
// If not set, this is taken to be the same as first.
Last *uint32 // sequence number
// indicates deliveries are settled
//
// If true, indicates that the referenced deliveries are considered settled by
// the issuing endpoint.
Settled bool
// indicates state of deliveries
//
// Communicates the state of all the deliveries referenced by this disposition.
State encoding.DeliveryState
// batchable hint
//
// If true, then the issuer is hinting that there is no need for the peer to
// urgently communicate the impact of the updated delivery states. This hint
// MAY be used to artificially increase the amount of batching an implementation
// uses when communicating delivery states, and thereby save bandwidth.
Batchable bool
}
func (d *PerformDisposition) frameBody() {}
func (d PerformDisposition) String() string {
return fmt.Sprintf("Disposition{Role: %s, First: %d, Last: %s, Settled: %t, State: %v, Batchable: %t}",
d.Role,
d.First,
formatUint32Ptr(d.Last),
d.Settled,
d.State,
d.Batchable,
)
}
func (d *PerformDisposition) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeDisposition, []encoding.MarshalField{
{Value: &d.Role, Omit: false},
{Value: &d.First, Omit: false},
{Value: d.Last, Omit: d.Last == nil},
{Value: &d.Settled, Omit: !d.Settled},
{Value: d.State, Omit: d.State == nil},
{Value: &d.Batchable, Omit: !d.Batchable},
})
}
func (d *PerformDisposition) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeDisposition, []encoding.UnmarshalField{
{Field: &d.Role, HandleNull: func() error { return errors.New("Disposition.Role is required") }},
{Field: &d.First, HandleNull: func() error { return errors.New("Disposition.Handle is required") }},
{Field: &d.Last},
{Field: &d.Settled},
{Field: &d.State},
{Field: &d.Batchable},
}...)
}
/*
<type name="detach" class="composite" source="list" provides="frame">
<descriptor name="amqp:detach:list" code="0x00000000:0x00000016"/>
<field name="handle" type="handle" mandatory="true"/>
<field name="closed" type="boolean" default="false"/>
<field name="error" type="error"/>
</type>
*/
type PerformDetach struct {
// the local handle of the link to be detached
Handle uint32 //required
// if true then the sender has closed the link
Closed bool
// error causing the detach
//
// If set, this field indicates that the link is being detached due to an error
// condition. The value of the field SHOULD contain details on the cause of the error.
Error *encoding.Error
}
func (d *PerformDetach) frameBody() {}
func (d PerformDetach) String() string {
return fmt.Sprintf("Detach{Handle: %d, Closed: %t, Error: %v}",
d.Handle,
d.Closed,
d.Error,
)
}
func (d *PerformDetach) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeDetach, []encoding.MarshalField{
{Value: &d.Handle, Omit: false},
{Value: &d.Closed, Omit: !d.Closed},
{Value: d.Error, Omit: d.Error == nil},
})
}
func (d *PerformDetach) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeDetach, []encoding.UnmarshalField{
{Field: &d.Handle, HandleNull: func() error { return errors.New("Detach.Handle is required") }},
{Field: &d.Closed},
{Field: &d.Error},
}...)
}
/*
<type name="end" class="composite" source="list" provides="frame">
<descriptor name="amqp:end:list" code="0x00000000:0x00000017"/>
<field name="error" type="error"/>
</type>
*/
type PerformEnd struct {
// error causing the end
//
// If set, this field indicates that the session is being ended due to an error
// condition. The value of the field SHOULD contain details on the cause of the error.
Error *encoding.Error
}
func (e *PerformEnd) frameBody() {}
func (d PerformEnd) String() string {
return fmt.Sprintf("End{Error: %v}", d.Error)
}
func (e *PerformEnd) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeEnd, []encoding.MarshalField{
{Value: e.Error, Omit: e.Error == nil},
})
}
func (e *PerformEnd) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeEnd,
encoding.UnmarshalField{Field: &e.Error},
)
}
/*
<type name="close" class="composite" source="list" provides="frame">
<descriptor name="amqp:close:list" code="0x00000000:0x00000018"/>
<field name="error" type="error"/>
</type>
*/
type PerformClose struct {
// error causing the close
//
// If set, this field indicates that the session is being closed due to an error
// condition. The value of the field SHOULD contain details on the cause of the error.
Error *encoding.Error
}
func (c *PerformClose) frameBody() {}
func (c *PerformClose) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeClose, []encoding.MarshalField{
{Value: c.Error, Omit: c.Error == nil},
})
}
func (c *PerformClose) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeClose,
encoding.UnmarshalField{Field: &c.Error},
)
}
func (c *PerformClose) String() string {
return fmt.Sprintf("Close{Error: %s}", c.Error)
}
/*
<type name="sasl-init" class="composite" source="list" provides="sasl-frame">
<descriptor name="amqp:sasl-init:list" code="0x00000000:0x00000041"/>
<field name="mechanism" type="symbol" mandatory="true"/>
<field name="initial-response" type="binary"/>
<field name="hostname" type="string"/>
</type>
*/
type SASLInit struct {
Mechanism encoding.Symbol
InitialResponse []byte
Hostname string
}
func (si *SASLInit) frameBody() {}
func (si *SASLInit) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSASLInit, []encoding.MarshalField{
{Value: &si.Mechanism, Omit: false},
{Value: &si.InitialResponse, Omit: false},
{Value: &si.Hostname, Omit: len(si.Hostname) == 0},
})
}
func (si *SASLInit) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSASLInit, []encoding.UnmarshalField{
{Field: &si.Mechanism, HandleNull: func() error { return errors.New("saslInit.Mechanism is required") }},
{Field: &si.InitialResponse},
{Field: &si.Hostname},
}...)
}
func (si *SASLInit) String() string {
// Elide the InitialResponse as it may contain a plain text secret.
return fmt.Sprintf("SaslInit{Mechanism : %s, InitialResponse: ********, Hostname: %s}",
si.Mechanism,
si.Hostname,
)
}
/*
<type name="sasl-mechanisms" class="composite" source="list" provides="sasl-frame">
<descriptor name="amqp:sasl-mechanisms:list" code="0x00000000:0x00000040"/>
<field name="sasl-server-mechanisms" type="symbol" multiple="true" mandatory="true"/>
</type>
*/
type SASLMechanisms struct {
Mechanisms encoding.MultiSymbol
}
func (sm *SASLMechanisms) frameBody() {}
func (sm *SASLMechanisms) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSASLMechanism, []encoding.MarshalField{
{Value: &sm.Mechanisms, Omit: false},
})
}
func (sm *SASLMechanisms) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSASLMechanism,
encoding.UnmarshalField{Field: &sm.Mechanisms, HandleNull: func() error { return errors.New("saslMechanisms.Mechanisms is required") }},
)
}
func (sm *SASLMechanisms) String() string {
return fmt.Sprintf("SaslMechanisms{Mechanisms : %v}",
sm.Mechanisms,
)
}
/*
<type class="composite" name="sasl-challenge" source="list" provides="sasl-frame" label="security mechanism challenge">
<descriptor name="amqp:sasl-challenge:list" code="0x00000000:0x00000042"/>
<field name="challenge" type="binary" label="security challenge data" mandatory="true"/>
</type>
*/
type SASLChallenge struct {
Challenge []byte
}
func (sc *SASLChallenge) String() string {
return "Challenge{Challenge: ********}"
}
func (sc *SASLChallenge) frameBody() {}
func (sc *SASLChallenge) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSASLChallenge, []encoding.MarshalField{
{Value: &sc.Challenge, Omit: false},
})
}
func (sc *SASLChallenge) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSASLChallenge, []encoding.UnmarshalField{
{Field: &sc.Challenge, HandleNull: func() error { return errors.New("saslChallenge.Challenge is required") }},
}...)
}
/*
<type class="composite" name="sasl-response" source="list" provides="sasl-frame" label="security mechanism response">
<descriptor name="amqp:sasl-response:list" code="0x00000000:0x00000043"/>
<field name="response" type="binary" label="security response data" mandatory="true"/>
</type>
*/
type SASLResponse struct {
Response []byte
}
func (sr *SASLResponse) String() string {
return "Response{Response: ********}"
}
func (sr *SASLResponse) frameBody() {}
func (sr *SASLResponse) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSASLResponse, []encoding.MarshalField{
{Value: &sr.Response, Omit: false},
})
}
func (sr *SASLResponse) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSASLResponse, []encoding.UnmarshalField{
{Field: &sr.Response, HandleNull: func() error { return errors.New("saslResponse.Response is required") }},
}...)
}
/*
<type name="sasl-outcome" class="composite" source="list" provides="sasl-frame">
<descriptor name="amqp:sasl-outcome:list" code="0x00000000:0x00000044"/>
<field name="code" type="sasl-code" mandatory="true"/>
<field name="additional-data" type="binary"/>
</type>
*/
type SASLOutcome struct {
Code encoding.SASLCode
AdditionalData []byte
}
func (so *SASLOutcome) frameBody() {}
func (so *SASLOutcome) Marshal(wr *buffer.Buffer) error {
return encoding.MarshalComposite(wr, encoding.TypeCodeSASLOutcome, []encoding.MarshalField{
{Value: &so.Code, Omit: false},
{Value: &so.AdditionalData, Omit: len(so.AdditionalData) == 0},
})
}
func (so *SASLOutcome) Unmarshal(r *buffer.Buffer) error {
return encoding.UnmarshalComposite(r, encoding.TypeCodeSASLOutcome, []encoding.UnmarshalField{
{Field: &so.Code, HandleNull: func() error { return errors.New("saslOutcome.AdditionalData is required") }},
{Field: &so.AdditionalData},
}...)
}
func (so *SASLOutcome) String() string {
return fmt.Sprintf("SaslOutcome{Code : %v, AdditionalData: %v}",
so.Code,
so.AdditionalData,
)
}