in src/router_core/connections.c [1740:1899]
static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
qdr_connection_t *conn = safe_deref_qdr_connection_t(action->args.connection.conn);
qdr_link_t *link = safe_deref_qdr_link_t(action->args.connection.link);
qdr_delivery_t *initial_dlv = action->args.connection.initial_delivery;
if (discard || !conn || !link) {
if (initial_dlv)
qdr_delivery_decref(core, initial_dlv,
"qdr_link_inbound_first_attach_CT - discarding action");
return;
}
qd_direction_t dir = action->args.connection.dir;
qdr_terminus_t *source = action->args.connection.source;
qdr_terminus_t *target = action->args.connection.target;
//
// Start the attach count.
//
link->attach_count = 1;
//
// Put the link into the proper lists for tracking.
//
DEQ_INSERT_TAIL(core->open_links, link);
qdr_add_link_ref(&conn->links, link, QDR_LINK_LIST_CLASS_CONNECTION);
//
// Mark the link as an edge link if it's inside an edge connection.
//
link->edge = (conn->role == QDR_ROLE_EDGE_CONNECTION);
//
// Reject any attaches of inter-router links that arrive on connections that are not inter-router.
//
if (((link->link_type == QD_LINK_CONTROL || link->link_type == QD_LINK_ROUTER) &&
conn->role != QDR_ROLE_INTER_ROUTER)) {
link->link_type = QD_LINK_ENDPOINT; // Demote the link type to endpoint if this is not an inter-router connection
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_FORBIDDEN, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Router attach forbidden on non-inter-router connection", conn->identity);
return;
}
//
// Reject ENDPOINT attaches if this is an inter-router connection _and_ there is no
// CONTROL link on the connection. This will prevent endpoints from using inter-router
// listeners for normal traffic but will not prevent routed-links from being established.
//
if (conn->role == QDR_ROLE_INTER_ROUTER && link->link_type == QD_LINK_ENDPOINT &&
core->control_links_by_mask_bit[conn->mask_bit] == 0) {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_WRONG_ROLE, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Endpoint attach forbidden on inter-router connection", conn->identity);
return;
}
char source_str[1000];
char target_str[1000];
size_t source_len = 1000;
size_t target_len = 1000;
source_str[0] = '\0';
target_str[0] = '\0';
//
// Grab the formatted terminus strings before we schedule any IO-thread processing that
// might get ahead of us and free the terminus objects before we issue the log.
//
if (qd_log_enabled(core->log, QD_LOG_INFO)) {
qdr_terminus_format(source, source_str, &source_len);
qdr_terminus_format(target, target_str, &target_len);
}
if (dir == QD_INCOMING) {
//
// Handle incoming link cases
//
switch (link->link_type) {
case QD_LINK_ENDPOINT: {
if (qdr_terminus_is_anonymous(target)) {
link->owning_addr = 0;
qdr_link_outbound_second_attach_CT(core, link, source, target);
qdr_link_issue_credit_CT(core, link, link->capacity, false);
} else {
//
// This link has a target address
//
if (core->addr_lookup_handler)
core->addr_lookup_handler(core->addr_lookup_context, conn, link, dir, source, target);
else {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Endpoint attach failed - no address lookup handler", conn->identity);
return;
}
}
break;
}
case QD_LINK_ROUTER:
qdr_attach_link_data_CT(core, conn, link);
// fall-through:
case QD_LINK_CONTROL:
qdr_link_outbound_second_attach_CT(core, link, source, target);
qdr_link_issue_credit_CT(core, link, link->capacity, false);
break;
case QD_LINK_EDGE_DOWNLINK:
break;
}
} else {
//
// Handle outgoing link cases
//
if (initial_dlv) {
qdr_link_process_initial_delivery_CT(core, link, initial_dlv);
qdr_delivery_decref(core, initial_dlv,
"qdr_link_inbound_first_attach_CT - dropping action reference");
initial_dlv = 0;
}
switch (link->link_type) {
case QD_LINK_ENDPOINT: {
if (core->addr_lookup_handler)
core->addr_lookup_handler(core->addr_lookup_context, conn, link, dir, source, target);
else {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
qdr_terminus_free(target);
qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"] Endpoint attach failed - no address lookup handler", conn->identity);
return;
}
break;
}
case QD_LINK_CONTROL:
qdr_attach_link_control_CT(core, conn, link);
qdr_link_outbound_second_attach_CT(core, link, source, target);
break;
case QD_LINK_ROUTER:
qdr_attach_link_data_CT(core, conn, link);
qdr_link_outbound_second_attach_CT(core, link, source, target);
break;
case QD_LINK_EDGE_DOWNLINK:
qdr_attach_link_downlink_CT(core, conn, link, source);
qdr_link_outbound_second_attach_CT(core, link, source, target);
break;
}
}
qd_log(core->log, QD_LOG_INFO, "[C%"PRIu64"][L%"PRIu64"] Link attached: dir=%s source=%s target=%s",
conn->identity, link->identity, dir == QD_INCOMING ? "in" : "out", source_str, target_str);
}