in src/router_core/forwarder.c [750:940]
int qdr_forward_balanced_CT(qdr_core_t *core,
qdr_address_t *addr,
qd_message_t *msg,
qdr_delivery_t *in_delivery,
bool exclude_inprocess,
bool control)
{
//
// Control messages should never use balanced treatment.
//
assert(!control);
//
// If this is the first time through here, allocate the array for outstanding delivery counts.
//
if (addr->outstanding_deliveries == 0) {
addr->outstanding_deliveries = NEW_ARRAY(int, qd_bitmask_width());
for (int i = 0; i < qd_bitmask_width(); i++)
addr->outstanding_deliveries[i] = 0;
}
qdr_link_t *best_eligible_link = 0;
int best_eligible_conn_bit = -1;
uint32_t eligible_link_value = UINT32_MAX;
qdr_link_t *best_ineligible_link = 0;
int best_ineligible_conn_bit = -1;
uint32_t ineligible_link_value = UINT32_MAX;
//
// Find all the possible outbound links for this delivery, searching for the one with the
// smallest eligible value. Value = outstanding_deliveries + minimum_downrange_cost.
// A link is ineligible if the outstanding_deliveries is equal to or greater than the
// link's capacity.
//
// If there are no eligible links, use the best ineligible link. Zero fanout should be returned
// only if there are no available destinations.
//
//
// Start with the local links
//
qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
while (link_ref && eligible_link_value != 0) {
qdr_link_t *link = link_ref->link;
sys_mutex_lock(link->conn->work_lock);
uint32_t value = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
sys_mutex_unlock(link->conn->work_lock);
bool eligible = link->capacity > value;
//
// Only consider links that do not result in edge-echo.
//
if (!qdr_forward_edge_echo_CT(in_delivery, link)) {
//
// If this is the best eligible link so far, record the fact.
// Otherwise, if this is the best ineligible link, make note of that.
//
if (eligible && eligible_link_value > value) {
best_eligible_link = link;
eligible_link_value = value;
} else if (!eligible && ineligible_link_value > value) {
best_ineligible_link = link;
ineligible_link_value = value;
}
}
link_ref = DEQ_NEXT(link_ref);
}
//
// If we haven't already found a link with zero (best possible) value, check the
// inter-router links as well.
//
if (!best_eligible_link || eligible_link_value > 0) {
//
// Get the mask bit associated with the ingress router for the message.
// This will be compared against the "valid_origin" masks for each
// candidate destination router.
//
int origin = 0; // default to this router
qd_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0;
if (ingress_iter) {
qd_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
qdr_address_t *origin_addr;
qd_hash_retrieve(core->addr_hash, ingress_iter, (void*) &origin_addr);
if (origin_addr && qd_bitmask_cardinality(origin_addr->rnodes) == 1)
qd_bitmask_first_set(origin_addr->rnodes, &origin);
}
int c;
int node_bit;
for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) {
qdr_node_t *rnode = core->routers_by_mask_bit[node_bit];
if (qd_bitmask_value(rnode->valid_origins, origin)) {
qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
int conn_bit = next_node->conn_mask_bit;
uint8_t priority = qdr_forward_effective_priority(msg, addr);
qdr_link_t *link = peer_router_data_link(core, conn_bit, priority);
if (!link) continue;
int value = addr->outstanding_deliveries[conn_bit];
bool eligible = link->capacity > value;
//
// Link is a candidate, adjust the value by the bias (node cost).
//
value += rnode->cost;
if (eligible && eligible_link_value > value) {
best_eligible_link = link;
best_eligible_conn_bit = conn_bit;
eligible_link_value = value;
} else if (!eligible && ineligible_link_value > value) {
best_ineligible_link = link;
best_ineligible_conn_bit = conn_bit;
ineligible_link_value = value;
}
}
}
} else if (best_eligible_link) {
//
// Rotate the rlinks list to enhance the appearance of balance when there is
// little load (see DISPATCH-367)
//
if (DEQ_SIZE(addr->rlinks) > 1) {
link_ref = DEQ_HEAD(addr->rlinks);
DEQ_REMOVE_HEAD(addr->rlinks);
DEQ_INSERT_TAIL(addr->rlinks, link_ref);
}
}
qdr_link_t *chosen_link = 0;
int chosen_conn_bit = -1;
if (best_eligible_link) {
chosen_link = best_eligible_link;
chosen_conn_bit = best_eligible_conn_bit;
} else if (best_ineligible_link) {
chosen_link = best_ineligible_link;
chosen_conn_bit = best_ineligible_conn_bit;
}
if (chosen_link) {
// DISPATCH-1545 (head of line blocking): if the message is streaming,
// see if the allows us to open a dedicated link for streaming
if (!qd_message_receive_complete(msg) && chosen_link->conn->connection_info->streaming_links) {
chosen_link = get_outgoing_streaming_link(core, chosen_link->conn);
if (!chosen_link)
return 0;
}
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
qdr_forward_deliver_CT(core, chosen_link, out_delivery);
//
// Bump the appropriate counter based on where we sent the delivery.
//
if (chosen_conn_bit >= 0) { // sent to peer router
//
// If the delivery is unsettled account for the outstanding delivery sent inter-router.
//
if (in_delivery && !in_delivery->settled) {
addr->outstanding_deliveries[chosen_conn_bit]++;
out_delivery->tracking_addr = addr;
out_delivery->tracking_addr_bit = chosen_conn_bit;
addr->tracked_deliveries++;
}
addr->deliveries_transit++;
if (chosen_link->link_type == QD_LINK_ROUTER)
core->deliveries_transit++;
}
else {
if (!chosen_link->fallback) {
addr->deliveries_egress++;
core->deliveries_egress++;
}
if (qdr_connection_route_container(chosen_link->conn)) {
core->deliveries_egress_route_container++;
addr->deliveries_egress_route_container++;
}
}
return 1;
}
return 0;
}