in c/src/core/transport.c [1215:1366]
int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_bytes_t payload)
{
pn_bytes_t name;
uint32_t handle;
bool is_sender;
pn_bytes_t source, target;
pn_durability_t src_dr, tgt_dr;
pn_bytes_t src_exp, tgt_exp;
pn_seconds_t src_timeout, tgt_timeout;
bool src_dynamic, tgt_dynamic;
pn_sequence_t idc;
pn_bytes_t dist_mode;
bool snd_settle, rcv_settle;
uint8_t snd_settle_mode, rcv_settle_mode;
uint64_t max_msgsz;
bool has_props;
pn_data_t *rem_props = pn_data(0);
pn_amqp_decode_DqESIoQBQBDqESIsIoqseDqESIsIoeqqILqqQCe(payload,
&name, &handle,
&is_sender,
&snd_settle, &snd_settle_mode,
&rcv_settle, &rcv_settle_mode,
&source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode,
&target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic,
&idc, &max_msgsz, &has_props, rem_props);
char strbuf[128]; // avoid malloc for most link names
char *strheap = (name.size >= sizeof(strbuf)) ? (char *) malloc(name.size + 1) : NULL;
char *strname = strheap ? strheap : strbuf;
if (name.size > 0) strncpy(strname, name.start, name.size);
strname[name.size] = '\0';
pn_session_t *ssn = pni_channel_state(transport, channel);
if (!ssn) {
pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel);
if (strheap) free(strheap);
pn_free(rem_props);
return PN_EOS;
}
if (handle > ssn->local_handle_max) {
pn_do_error(transport, "amqp:connection:framing-error",
"remote handle %u is above handle_max %u", handle, ssn->local_handle_max);
if (strheap) free(strheap);
pn_free(rem_props);
return PN_EOS;
}
pn_link_t *link = pni_find_link(ssn, name, is_sender);
if (link && (int32_t)link->state.remote_handle >= 0) {
pn_do_error(transport, "amqp:invalid-field", "link name already attached: %s", strname);
if (strheap) free(strheap);
pn_free(rem_props);
return PN_EOS;
}
if (!link) { /* Make a new link for the attach */
if (is_sender) {
link = (pn_link_t *) pn_sender(ssn, strname);
} else {
link = (pn_link_t *) pn_receiver(ssn, strname);
}
}
if (strheap) {
free(strheap);
}
if (has_props) {
link->remote_properties = rem_props;
} else {
pn_free(rem_props);
}
pni_map_remote_handle(link, handle);
PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE);
pn_terminus_t *rsrc = &link->remote_source;
if (source.start || src_dynamic) {
pn_terminus_set_type(rsrc, PN_SOURCE);
pn_terminus_set_address_bytes(rsrc, source);
pn_terminus_set_durability(rsrc, src_dr);
set_expiry_policy_from_symbol(rsrc, src_exp);
pn_terminus_set_timeout(rsrc, src_timeout);
pn_terminus_set_dynamic(rsrc, src_dynamic);
pn_terminus_set_distribution_mode(rsrc, symbol2dist_mode(dist_mode));
} else {
pn_terminus_set_type(rsrc, PN_UNSPECIFIED);
}
pn_terminus_t *rtgt = &link->remote_target;
if (target.start || tgt_dynamic) {
pn_terminus_set_type(rtgt, PN_TARGET);
pn_terminus_set_address_bytes(rtgt, target);
pn_terminus_set_durability(rtgt, tgt_dr);
set_expiry_policy_from_symbol(rtgt, tgt_exp);
pn_terminus_set_timeout(rtgt, tgt_timeout);
pn_terminus_set_dynamic(rtgt, tgt_dynamic);
} else {
uint64_t code = 0;
pn_amqp_decode_DqEqqqqqDqqDLqqqqe(payload, &code);
if (code == COORDINATOR) {
pn_terminus_set_type(rtgt, PN_COORDINATOR);
} else if (code == TARGET) {
pn_terminus_set_type(rtgt, PN_TARGET);
} else {
pn_terminus_set_type(rtgt, PN_UNSPECIFIED);
}
}
if (snd_settle)
link->remote_snd_settle_mode = snd_settle_mode;
if (rcv_settle)
link->remote_rcv_settle_mode = rcv_settle_mode;
pn_data_clear(link->remote_source.properties);
pn_data_clear(link->remote_source.filter);
pn_data_clear(link->remote_source.outcomes);
pn_data_clear(link->remote_source.capabilities);
pn_amqp_decode_DqEqqqqqDqEqqqqqCqCqCCee(payload,
link->remote_source.properties,
link->remote_source.filter,
link->remote_source.outcomes,
link->remote_source.capabilities);
pn_data_rewind(link->remote_source.properties);
pn_data_rewind(link->remote_source.filter);
pn_data_rewind(link->remote_source.outcomes);
pn_data_rewind(link->remote_source.capabilities);
pn_data_clear(link->remote_target.properties);
pn_data_clear(link->remote_target.capabilities);
if (pn_terminus_get_type(&link->remote_target) == PN_COORDINATOR) {
// coordinator target only has a capabilities field
pn_amqp_decode_DqEqqqqqDqqDqECeqqqe(payload,
link->remote_target.capabilities);
} else {
pn_amqp_decode_DqEqqqqqDqqDqEqqqqqCCee(payload,
link->remote_target.properties,
link->remote_target.capabilities);
}
pn_data_rewind(link->remote_target.properties);
pn_data_rewind(link->remote_target.capabilities);
if (!is_sender) {
link->state.delivery_count = idc;
}
if (max_msgsz) {
link->remote_max_message_size = max_msgsz;
}
pn_collector_put_object(transport->connection->collector, link, PN_LINK_REMOTE_OPEN);
return 0;
}