in c/src/core/transport.c [1182:1320]
int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload)
{
pn_bytes_t name;
uint32_t handle;
bool is_sender;
pn_bytes_t source, target;
pn_durability_t src_dr, tgt_dr;
pn_bytes_t src_exp, tgt_exp;
pn_seconds_t src_timeout, tgt_timeout;
bool src_dynamic, tgt_dynamic;
pn_sequence_t idc;
pn_bytes_t dist_mode;
bool snd_settle, rcv_settle;
uint8_t snd_settle_mode, rcv_settle_mode;
uint64_t max_msgsz;
bool has_props;
pn_bytes_t rem_props = (pn_bytes_t){0, NULL};
pn_amqp_decode_DqESIoQBQBDqESIsIoqseDqESIsIoeqqILqqQRe(payload,
&name, &handle,
&is_sender,
&snd_settle, &snd_settle_mode,
&rcv_settle, &rcv_settle_mode,
&source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode,
&target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic,
&idc, &max_msgsz, &has_props, &rem_props);
pn_session_t *ssn = pni_channel_state(transport, channel);
if (!ssn) {
pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel);
return PN_EOS;
}
if (handle > ssn->local_handle_max) {
pn_do_error(transport, "amqp:connection:framing-error",
"remote handle %u is above handle_max %u", handle, ssn->local_handle_max);
return PN_EOS;
}
pn_link_t *link = pni_find_link(ssn, name, is_sender);
if (link && (int32_t)link->state.remote_handle >= 0) {
pn_do_error(transport, "amqp:invalid-field", "link name already attached: %.*s", (int)name.size, name.start);
return PN_EOS;
}
if (!link) { /* Make a new link for the attach */
if (is_sender) {
link = (pn_link_t *) pn_link_new(SENDER, ssn, pn_stringn(name.start, name.size));
} else {
link = (pn_link_t *) pn_link_new(RECEIVER, ssn, pn_stringn(name.start, name.size));
}
}
if (has_props) {
pn_bytes_free(link->remote_properties_raw);
link->remote_properties_raw = pn_bytes_dup(rem_props);
}
pni_map_remote_handle(link, handle);
PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE);
pn_terminus_t *rsrc = &link->remote_source;
if (source.start || src_dynamic) {
pn_terminus_set_type(rsrc, PN_SOURCE);
pn_terminus_set_address_bytes(rsrc, source);
pn_terminus_set_durability(rsrc, src_dr);
set_expiry_policy_from_symbol(rsrc, src_exp);
pn_terminus_set_timeout(rsrc, src_timeout);
pn_terminus_set_dynamic(rsrc, src_dynamic);
pn_terminus_set_distribution_mode(rsrc, symbol2dist_mode(dist_mode));
} else {
pn_terminus_set_type(rsrc, PN_UNSPECIFIED);
}
pn_terminus_t *rtgt = &link->remote_target;
if (target.start || tgt_dynamic) {
pn_terminus_set_type(rtgt, PN_TARGET);
pn_terminus_set_address_bytes(rtgt, target);
pn_terminus_set_durability(rtgt, tgt_dr);
set_expiry_policy_from_symbol(rtgt, tgt_exp);
pn_terminus_set_timeout(rtgt, tgt_timeout);
pn_terminus_set_dynamic(rtgt, tgt_dynamic);
} else {
uint64_t code = 0;
pn_amqp_decode_DqEqqqqqDqqDLqqqqe(payload, &code);
if (code == AMQP_DESC_COORDINATOR) {
pn_terminus_set_type(rtgt, PN_COORDINATOR);
} else if (code == AMQP_DESC_TARGET) {
pn_terminus_set_type(rtgt, PN_TARGET);
} else {
pn_terminus_set_type(rtgt, PN_UNSPECIFIED);
}
}
if (snd_settle)
link->remote_snd_settle_mode = snd_settle_mode;
if (rcv_settle)
link->remote_rcv_settle_mode = rcv_settle_mode;
pn_bytes_t rem_src_properties = (pn_bytes_t) {0, NULL};
pn_bytes_t rem_src_filter = (pn_bytes_t) {0, NULL};
pn_bytes_t rem_src_outcomes = (pn_bytes_t) {0, NULL};
pn_bytes_t rem_src_capabilities = (pn_bytes_t) {0, NULL};
pn_amqp_decode_DqEqqqqqDqEqqqqqRqRqRRee(payload,
&rem_src_properties,
&rem_src_filter,
&rem_src_outcomes,
&rem_src_capabilities);
pn_bytes_free(rsrc->properties_raw);
rsrc->properties_raw = pn_bytes_dup(rem_src_properties);
pn_bytes_free(rsrc->filter_raw);
rsrc->filter_raw = pn_bytes_dup(rem_src_filter);
pn_bytes_free(rsrc->outcomes_raw);
rsrc->outcomes_raw = pn_bytes_dup(rem_src_outcomes);
pn_bytes_free(rsrc->capabilities_raw);
rsrc->capabilities_raw = pn_bytes_dup(rem_src_capabilities);
pn_bytes_t rem_tgt_properties = (pn_bytes_t) {0, NULL};
pn_bytes_t rem_tgt_capabilities = (pn_bytes_t) {0, NULL};
if (pn_terminus_get_type(&link->remote_target) == PN_COORDINATOR) {
// coordinator target only has a capabilities field
pn_amqp_decode_DqEqqqqqDqqDqEReqqqe(payload,
&rem_tgt_capabilities);
} else {
pn_amqp_decode_DqEqqqqqDqqDqEqqqqqRRee(payload,
&rem_tgt_properties,
&rem_tgt_capabilities);
}
pn_bytes_free(rtgt->properties_raw);
rtgt->properties_raw = pn_bytes_dup(rem_tgt_properties);
pn_bytes_free(rtgt->capabilities_raw);
rtgt->capabilities_raw = pn_bytes_dup(rem_tgt_capabilities);
if (!is_sender) {
link->state.delivery_count = idc;
}
if (max_msgsz) {
link->remote_max_message_size = max_msgsz;
}
pn_collector_put_object(transport->connection->collector, link, PN_LINK_REMOTE_OPEN);
return 0;
}