int qdr_forward_balanced_CT()

in src/router_core/forwarder.c [750:940]


int qdr_forward_balanced_CT(qdr_core_t      *core,
                            qdr_address_t   *addr,
                            qd_message_t    *msg,
                            qdr_delivery_t  *in_delivery,
                            bool             exclude_inprocess,
                            bool             control)
{
    //
    // Control messages should never use balanced treatment.
    //
    assert(!control);

    //
    // If this is the first time through here, allocate the array for outstanding delivery counts.
    //
    if (addr->outstanding_deliveries == 0) {
        addr->outstanding_deliveries = NEW_ARRAY(int, qd_bitmask_width());
        for (int i = 0; i < qd_bitmask_width(); i++)
            addr->outstanding_deliveries[i] = 0;
    }

    qdr_link_t *best_eligible_link       = 0;
    int         best_eligible_conn_bit   = -1;
    uint32_t    eligible_link_value      = UINT32_MAX;
    qdr_link_t *best_ineligible_link     = 0;
    int         best_ineligible_conn_bit = -1;
    uint32_t    ineligible_link_value    = UINT32_MAX;

    //
    // Find all the possible outbound links for this delivery, searching for the one with the
    // smallest eligible value.  Value = outstanding_deliveries + minimum_downrange_cost.
    // A link is ineligible if the outstanding_deliveries is equal to or greater than the
    // link's capacity.
    //
    // If there are no eligible links, use the best ineligible link.  Zero fanout should be returned
    // only if there are no available destinations.
    //

    //
    // Start with the local links
    //
    qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
    while (link_ref && eligible_link_value != 0) {
        qdr_link_t *link     = link_ref->link;
        sys_mutex_lock(link->conn->work_lock);
        uint32_t    value    = DEQ_SIZE(link->undelivered) + DEQ_SIZE(link->unsettled);
        sys_mutex_unlock(link->conn->work_lock);
        bool        eligible = link->capacity > value;

        //
        // Only consider links that do not result in edge-echo.
        //
        if (!qdr_forward_edge_echo_CT(in_delivery, link)) {
            //
            // If this is the best eligible link so far, record the fact.
            // Otherwise, if this is the best ineligible link, make note of that.
            //
            if (eligible && eligible_link_value > value) {
                best_eligible_link  = link;
                eligible_link_value = value;
            } else if (!eligible && ineligible_link_value > value) {
                best_ineligible_link  = link;
                ineligible_link_value = value;
            }
        }

        link_ref = DEQ_NEXT(link_ref);
    }

    //
    // If we haven't already found a link with zero (best possible) value, check the
    // inter-router links as well.
    //
    if (!best_eligible_link || eligible_link_value > 0) {
        //
        // Get the mask bit associated with the ingress router for the message.
        // This will be compared against the "valid_origin" masks for each
        // candidate destination router.
        //
        int origin = 0;  // default to this router
        qd_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0;

        if (ingress_iter) {
            qd_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
            qdr_address_t *origin_addr;
            qd_hash_retrieve(core->addr_hash, ingress_iter, (void*) &origin_addr);
            if (origin_addr && qd_bitmask_cardinality(origin_addr->rnodes) == 1)
                qd_bitmask_first_set(origin_addr->rnodes, &origin);
        }

        int c;
        int node_bit;
        for (QD_BITMASK_EACH(addr->rnodes, node_bit, c)) {
            qdr_node_t *rnode     = core->routers_by_mask_bit[node_bit];

            if (qd_bitmask_value(rnode->valid_origins, origin)) {

                qdr_node_t *next_node = rnode->next_hop ? rnode->next_hop : rnode;
                int         conn_bit  = next_node->conn_mask_bit;
                uint8_t     priority  = qdr_forward_effective_priority(msg, addr);
                qdr_link_t *link      = peer_router_data_link(core, conn_bit, priority);
                if (!link) continue;

                int         value     = addr->outstanding_deliveries[conn_bit];
                bool        eligible  = link->capacity > value;

                //
                // Link is a candidate, adjust the value by the bias (node cost).
                //
                value += rnode->cost;
                if (eligible && eligible_link_value > value) {
                    best_eligible_link     = link;
                    best_eligible_conn_bit = conn_bit;
                    eligible_link_value    = value;
                } else if (!eligible && ineligible_link_value > value) {
                    best_ineligible_link     = link;
                    best_ineligible_conn_bit = conn_bit;
                    ineligible_link_value    = value;
                }
            }
        }
    } else if (best_eligible_link) {
        //
        // Rotate the rlinks list to enhance the appearance of balance when there is
        // little load (see DISPATCH-367)
        //
        if (DEQ_SIZE(addr->rlinks) > 1) {
            link_ref = DEQ_HEAD(addr->rlinks);
            DEQ_REMOVE_HEAD(addr->rlinks);
            DEQ_INSERT_TAIL(addr->rlinks, link_ref);
        }
    }

    qdr_link_t *chosen_link     = 0;
    int         chosen_conn_bit = -1;

    if (best_eligible_link) {
        chosen_link     = best_eligible_link;
        chosen_conn_bit = best_eligible_conn_bit;
    } else if (best_ineligible_link) {
        chosen_link     = best_ineligible_link;
        chosen_conn_bit = best_ineligible_conn_bit;
    }

    if (chosen_link) {

        // DISPATCH-1545 (head of line blocking): if the message is streaming,
        // see if the allows us to open a dedicated link for streaming
        if (!qd_message_receive_complete(msg) && chosen_link->conn->connection_info->streaming_links) {
            chosen_link = get_outgoing_streaming_link(core, chosen_link->conn);
            if (!chosen_link)
                return 0;
        }

        qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, chosen_link, msg);
        qdr_forward_deliver_CT(core, chosen_link, out_delivery);

        //
        // Bump the appropriate counter based on where we sent the delivery.
        //
        if (chosen_conn_bit >= 0) {  // sent to peer router
            //
            // If the delivery is unsettled account for the outstanding delivery sent inter-router.
            //
            if (in_delivery && !in_delivery->settled) {
                addr->outstanding_deliveries[chosen_conn_bit]++;
                out_delivery->tracking_addr     = addr;
                out_delivery->tracking_addr_bit = chosen_conn_bit;
                addr->tracked_deliveries++;
            }

            addr->deliveries_transit++;
            if (chosen_link->link_type == QD_LINK_ROUTER)
                core->deliveries_transit++;
        }
        else {
            if (!chosen_link->fallback) {
                addr->deliveries_egress++;
                core->deliveries_egress++;
            }

            if (qdr_connection_route_container(chosen_link->conn)) {
                core->deliveries_egress_route_container++;
                addr->deliveries_egress_route_container++;
            }
        }
        return 1;
    }

    return 0;
}