in src/router_core/modules/address_lookup_client/address_lookup_client.c [346:449]
static void qdr_link_react_to_first_attach_CT(qdr_core_t *core,
qdr_connection_t *conn,
qdr_address_t *addr,
qdr_link_t *link,
qd_direction_t dir,
qdr_terminus_t *source, // must free when done
qdr_terminus_t *target, // must free when done
bool link_route,
bool unavailable,
bool core_endpoint,
bool fallback)
{
link->fallback = fallback;
if (core_endpoint) {
qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
source = target = 0; // ownership passed to qdrc_endpoint_do_bound_attach_CT
}
else if (unavailable && qdr_terminus_is_coordinator(dir == QD_INCOMING ? target : source) && !addr) {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true);
}
else if (unavailable) {
qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
}
else if (!addr) {
//
// No route to this destination, reject the link
//
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
}
else if (link_route) {
//
// This is a link-routed destination, forward the attach to the next hop
//
qdr_terminus_t *term = dir == QD_INCOMING ? target : source;
if (qdr_terminus_survives_disconnect(term) && !core->qd->allow_resumable_link_route) {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
} else {
if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
}
bool success = qdr_forward_attach_CT(core, addr, link, source, target);
if (success) {
source = target = 0; // ownership passed to qdr_forward_attach_CT
} else {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
}
}
}
else if (dir == QD_INCOMING && qdr_terminus_is_coordinator(target)) {
//
// This target terminus is a coordinator.
// If we got here, it means that the coordinator link attach could not be link routed to a broker (or to the next router).
// The router should reject this link because the router cannot coordinate transactions itself.
//
// The attach response should have a null target to indicate refusal and the immediately coming detach.
//
qdr_link_outbound_second_attach_CT(core, link, source, 0);
source = 0; // ownership passed to qdr_link_outbound_second_attach_CT
//
// Now, send back a detach with the error amqp:precondition-failed
//
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true);
} else {
//
// Associate the link with the address. With this association, it will be unnecessary
// to do an address lookup for deliveries that arrive on this link.
//
qdr_core_bind_address_link_CT(core, addr, link);
qdr_link_outbound_second_attach_CT(core, link, source, target);
source = target = 0; // ownership passed to qdr_link_outbound_second_attach_CT
//
// Issue the initial credit only if one of the following
// holds:
// - there are destinations for the address
// - the address is that of an exchange (no subscribers allowed)
//
if (dir == QD_INCOMING
&& (DEQ_SIZE(addr->subscriptions)
|| DEQ_SIZE(addr->rlinks)
|| qd_bitmask_cardinality(addr->rnodes)
|| !!addr->exchange
|| (!!addr->fallback
&& (DEQ_SIZE(addr->fallback->subscriptions)
|| DEQ_SIZE(addr->fallback->rlinks)
|| qd_bitmask_cardinality(addr->fallback->rnodes))))) {
qdr_link_issue_credit_CT(core, link, link->capacity, false);
}
//
// If this link came through an edge connection, raise a link event to
// herald that fact.
//
if (link->conn->role == QDR_ROLE_EDGE_CONNECTION)
qdrc_event_link_raise(core, QDRC_EVENT_LINK_EDGE_DATA_ATTACHED, link);
}
if (source)
qdr_terminus_free(source);
if (target)
qdr_terminus_free(target);
}