int qdr_forward_multicast_CT()

in src/router_core/forwarder.c [464:626]


int qdr_forward_multicast_CT(qdr_core_t      *core,
                             qdr_address_t   *addr,
                             qd_message_t    *msg,
                             qdr_delivery_t  *in_delivery,
                             bool             exclude_inprocess,
                             bool             control)
{
    bool          bypass_valid_origins = addr->forwarder->bypass_valid_origins;
    int           fanout               = 0;
    qd_bitmask_t *link_exclusion       = !!in_delivery ? in_delivery->link_exclusion : 0;
    bool          receive_complete     = qd_message_receive_complete(msg);

    qdr_forward_deliver_info_list_t deliver_info_list;
    DEQ_INIT(deliver_info_list);

    //
    // Forward to local subscribers
    //
    if (!addr->local || exclude_inprocess) {
        qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
        while (link_ref) {
            qdr_link_t *out_link = link_ref->link;

            //
            // Only forward via links that don't result in edge-echo.
            //
            if (!qdr_forward_edge_echo_CT(in_delivery, out_link)) {

                if (!receive_complete && out_link->conn->connection_info->streaming_links) {
                    out_link = get_outgoing_streaming_link(core, out_link->conn);
                }

                if (out_link) {
                    qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);

                    // Store the out_link and out_delivery so we can forward the delivery later on
                    qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
                    ZERO(deliver_info);
                    deliver_info->out_dlv = out_delivery;
                    deliver_info->out_link = out_link;
                    DEQ_INSERT_TAIL(deliver_info_list, deliver_info);

                    fanout++;
                    if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER && !out_link->fallback) {
                        addr->deliveries_egress++;
                        core->deliveries_egress++;
                    }
                }
            }

            link_ref = DEQ_NEXT(link_ref);
        }
    }

    //
    // Forward to remote routers with subscribers using the appropriate
    // link for the traffic class: control or data
    //

    //
    // Get the mask bit associated with the ingress router for the message.
    // This will be compared against the "valid_origin" masks for each
    // candidate destination router.
    //
    int origin = -1;
    qd_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0;

    if (ingress_iter && !bypass_valid_origins) {
        qd_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
        qdr_address_t *origin_addr;
        qd_hash_retrieve(core->addr_hash, ingress_iter, (void*) &origin_addr);
        if (origin_addr && qd_bitmask_cardinality(origin_addr->rnodes) == 1)
            qd_bitmask_first_set(origin_addr->rnodes, &origin);
    } else
        origin = 0;

    //
    // Forward to the next-hops for remote destinations.
    //
    if (origin >= 0) {
        int           dest_bit;
        qd_bitmask_t *conn_set = qd_bitmask(0);  // connections to the next-hops

        //
        // Loop over the target nodes for this address.  Build a set of outgoing connections
        // for which there are valid targets.  We do this to avoid sending more than one
        // message to a given next-hop.  It's possible that there are multiple destinations
        // for this address that are all reachable via the same next-hop.  In this case, we
        // will send only one copy of the message to the next-hop and allow the downstream
        // routers to fan the message out.
        //
        int c;
        for (QD_BITMASK_EACH(addr->rnodes, dest_bit, c)) {
            qdr_node_t *rnode = core->routers_by_mask_bit[dest_bit];
            if (!rnode)
                continue;

            // get the inter-router connection associated with path to rnode:
            int conn_bit = (rnode->next_hop) ? rnode->next_hop->conn_mask_bit : rnode->conn_mask_bit;
            if (conn_bit >= 0 && (!link_exclusion || qd_bitmask_value(link_exclusion, conn_bit) == 0)) {
                qd_bitmask_set_bit(conn_set, conn_bit);
            }
        }

        //
        // Send a copy of the message over the inter-router connection to the next hop
        //
        int conn_bit;
        while (qd_bitmask_first_set(conn_set, &conn_bit)) {
            qd_bitmask_clear_bit(conn_set, conn_bit);

            qdr_link_t  *dest_link;
            if (control) {
                dest_link = peer_router_control_link(core, conn_bit);
            } else if (!receive_complete) {  // inter-router conns support dynamic streaming links
                dest_link = get_outgoing_streaming_link(core, core->rnode_conns_by_mask_bit[conn_bit]);
            } else {
                dest_link = peer_router_data_link(core, conn_bit, qdr_forward_effective_priority(msg, addr));
            }

            if (dest_link) {
                qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);

                // Store the out_link and out_delivery so we can forward the delivery later on
                qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
                ZERO(deliver_info);
                deliver_info->out_dlv = out_delivery;
                deliver_info->out_link = dest_link;
                DEQ_INSERT_TAIL(deliver_info_list, deliver_info);

                fanout++;
                addr->deliveries_transit++;
                if (dest_link->link_type == QD_LINK_ROUTER)
                    core->deliveries_transit++;
            }
        }

        qd_bitmask_free(conn_set);
    }

    if (!exclude_inprocess) {
        //
        // Forward to in-process subscribers
        //
        qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
        while (sub) {
            qdr_forward_to_subscriber_CT(core, sub, in_delivery, msg, receive_complete);
            fanout++;
            addr->deliveries_to_container++;
            sub = DEQ_NEXT(sub);
        }
    }

    qdr_forward_deliver_info_t *deliver_info = DEQ_HEAD(deliver_info_list);
    while (deliver_info) {
        qdr_forward_deliver_CT(core, deliver_info->out_link, deliver_info->out_dlv);
        DEQ_REMOVE_HEAD(deliver_info_list);
        free_qdr_forward_deliver_info_t(deliver_info);
        deliver_info = DEQ_HEAD(deliver_info_list);
    }

    return fanout;
}