in src/router_core/transfer.c [733:877]
static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
if (discard)
return;
qdr_delivery_t *dlv = action->args.delivery.delivery;
bool more = action->args.delivery.more;
qdr_link_t *link = qdr_delivery_link(dlv);
if (!link)
return;
if (link->conn)
link->conn->last_delivery_time = qdr_core_uptime_ticks(core);
link->total_deliveries++;
if (link->link_type == QD_LINK_ENDPOINT && !link->fallback)
core->deliveries_ingress++;
//
// Record the ingress time so we can track the age of this delivery.
//
dlv->ingress_time = qdr_core_uptime_ticks(core);
//
// If the link is an edge link, mark this delivery as via-edge
//
dlv->via_edge = link->edge;
//
// If this link has a core_endpoint, direct deliveries to that endpoint.
//
if (!!link->core_endpoint) {
qdrc_endpoint_do_deliver_CT(core, link->core_endpoint, dlv);
return;
}
if (link->connected_link) {
//
// If this is an attach-routed link, put the delivery directly onto the peer link
//
qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg);
//
// Copy the delivery tag. For link-routing, the delivery tag must be preserved.
//
peer->tag_length = action->args.delivery.tag_length;
memcpy(peer->tag, action->args.delivery.tag, peer->tag_length);
qdr_forward_deliver_CT(core, link->connected_link, peer);
if (!dlv->settled) {
DEQ_INSERT_TAIL(link->unsettled, dlv);
dlv->where = QDR_DELIVERY_IN_UNSETTLED;
qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery transfer: qdr_link_deliver_CT: action-list -> unsettled-list", DLV_ARGS(dlv));
} else {
//
// If the delivery is settled, decrement the ref_count on the delivery.
// This count was the owned-by-action count.
//
qdr_delivery_decref_CT(core, dlv, "qdr_link_deliver_CT - removed from action");
}
return;
}
//
// NOTE: The link->undelivered list does not need to be protected by the
// connection's work lock for incoming links. This protection is only
// needed for outgoing links.
//
if (DEQ_IS_EMPTY(link->undelivered)) {
qdr_link_ref_t *temp_rlink = 0;
qdr_address_t *addr = link->owning_addr;
if (!addr && dlv->to_addr) {
qdr_connection_t *conn = link->conn;
if (conn && conn->tenant_space)
qd_iterator_annotate_space(dlv->to_addr, conn->tenant_space, conn->tenant_space_len);
qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);
if (!addr) {
//
// This is an anonymous delivery but the address that it wants sent to is
// not in this router's address table. We will send this delivery up the
// anonymous link to the interior router (if this is an edge router).
// Only edge routers have a non null core->edge_conn_addr
//
if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION) {
qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
if (sender_address) {
addr = sender_address;
}
}
}
else {
//
// (core->edge_conn_addr is non-zero ONLY on edge routers. So there is no need to check if the
// core->router_mode is edge.
//
// The connection on which the delivery arrived should not be QDR_ROLE_EDGE_CONNECTION because
// we do not want to send it back over the same connections
//
if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION && qdr_is_addr_treatment_multicast(addr)) {
qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
if (sender_address && sender_address != addr) {
qdr_link_ref_t *sender_rlink = DEQ_HEAD(sender_address->rlinks);
if (sender_rlink) {
temp_rlink = new_qdr_link_ref_t();
DEQ_ITEM_INIT(temp_rlink);
temp_rlink->link = sender_rlink->link;
DEQ_INSERT_TAIL(addr->rlinks, temp_rlink);
}
}
}
}
}
//
// Deal with any delivery restrictions for this address.
//
if (addr && addr->router_control_only && link->link_type != QD_LINK_CONTROL) {
qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Link forward: qdr_link_deliver_CT: released dlv", DLV_ARGS(dlv));
qdr_delivery_release_CT(core, dlv);
qdr_link_issue_credit_CT(core, link, 1, false);
qdr_delivery_decref_CT(core, dlv, "qdr_link_deliver_CT - removed from action on restricted access");
} else {
//
// Give the action reference to the qdr_link_forward function. Don't decref/incref.
//
qdr_link_forward_CT(core, link, dlv, addr, more);
}
if (addr && temp_rlink) {
DEQ_REMOVE(addr->rlinks, temp_rlink);
free_qdr_link_ref_t(temp_rlink);
}
} else {
//
// Take the action reference and use it for undelivered. Don't decref/incref.
//
DEQ_INSERT_TAIL(link->undelivered, dlv);
dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery transfer: qdr_link_deliver_CT: action-list -> undelivered-list", DLV_ARGS(dlv));
}
}