in src/router_core/forwarder.c [464:626]
int qdr_forward_multicast_CT(qdr_core_t *core,
qdr_address_t *addr,
qd_message_t *msg,
qdr_delivery_t *in_delivery,
bool exclude_inprocess,
bool control)
{
bool bypass_valid_origins = addr->forwarder->bypass_valid_origins;
int fanout = 0;
qd_bitmask_t *link_exclusion = !!in_delivery ? in_delivery->link_exclusion : 0;
bool receive_complete = qd_message_receive_complete(msg);
qdr_forward_deliver_info_list_t deliver_info_list;
DEQ_INIT(deliver_info_list);
//
// Forward to local subscribers
//
if (!addr->local || exclude_inprocess) {
qdr_link_ref_t *link_ref = DEQ_HEAD(addr->rlinks);
while (link_ref) {
qdr_link_t *out_link = link_ref->link;
//
// Only forward via links that don't result in edge-echo.
//
if (!qdr_forward_edge_echo_CT(in_delivery, out_link)) {
if (!receive_complete && out_link->conn->connection_info->streaming_links) {
out_link = get_outgoing_streaming_link(core, out_link->conn);
}
if (out_link) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, out_link, msg);
// Store the out_link and out_delivery so we can forward the delivery later on
qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
ZERO(deliver_info);
deliver_info->out_dlv = out_delivery;
deliver_info->out_link = out_link;
DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
fanout++;
if (out_link->link_type != QD_LINK_CONTROL && out_link->link_type != QD_LINK_ROUTER && !out_link->fallback) {
addr->deliveries_egress++;
core->deliveries_egress++;
}
}
}
link_ref = DEQ_NEXT(link_ref);
}
}
//
// Forward to remote routers with subscribers using the appropriate
// link for the traffic class: control or data
//
//
// Get the mask bit associated with the ingress router for the message.
// This will be compared against the "valid_origin" masks for each
// candidate destination router.
//
int origin = -1;
qd_iterator_t *ingress_iter = in_delivery ? in_delivery->origin : 0;
if (ingress_iter && !bypass_valid_origins) {
qd_iterator_reset_view(ingress_iter, ITER_VIEW_NODE_HASH);
qdr_address_t *origin_addr;
qd_hash_retrieve(core->addr_hash, ingress_iter, (void*) &origin_addr);
if (origin_addr && qd_bitmask_cardinality(origin_addr->rnodes) == 1)
qd_bitmask_first_set(origin_addr->rnodes, &origin);
} else
origin = 0;
//
// Forward to the next-hops for remote destinations.
//
if (origin >= 0) {
int dest_bit;
qd_bitmask_t *conn_set = qd_bitmask(0); // connections to the next-hops
//
// Loop over the target nodes for this address. Build a set of outgoing connections
// for which there are valid targets. We do this to avoid sending more than one
// message to a given next-hop. It's possible that there are multiple destinations
// for this address that are all reachable via the same next-hop. In this case, we
// will send only one copy of the message to the next-hop and allow the downstream
// routers to fan the message out.
//
int c;
for (QD_BITMASK_EACH(addr->rnodes, dest_bit, c)) {
qdr_node_t *rnode = core->routers_by_mask_bit[dest_bit];
if (!rnode)
continue;
// get the inter-router connection associated with path to rnode:
int conn_bit = (rnode->next_hop) ? rnode->next_hop->conn_mask_bit : rnode->conn_mask_bit;
if (conn_bit >= 0 && (!link_exclusion || qd_bitmask_value(link_exclusion, conn_bit) == 0)) {
qd_bitmask_set_bit(conn_set, conn_bit);
}
}
//
// Send a copy of the message over the inter-router connection to the next hop
//
int conn_bit;
while (qd_bitmask_first_set(conn_set, &conn_bit)) {
qd_bitmask_clear_bit(conn_set, conn_bit);
qdr_link_t *dest_link;
if (control) {
dest_link = peer_router_control_link(core, conn_bit);
} else if (!receive_complete) { // inter-router conns support dynamic streaming links
dest_link = get_outgoing_streaming_link(core, core->rnode_conns_by_mask_bit[conn_bit]);
} else {
dest_link = peer_router_data_link(core, conn_bit, qdr_forward_effective_priority(msg, addr));
}
if (dest_link) {
qdr_delivery_t *out_delivery = qdr_forward_new_delivery_CT(core, in_delivery, dest_link, msg);
// Store the out_link and out_delivery so we can forward the delivery later on
qdr_forward_deliver_info_t *deliver_info = new_qdr_forward_deliver_info_t();
ZERO(deliver_info);
deliver_info->out_dlv = out_delivery;
deliver_info->out_link = dest_link;
DEQ_INSERT_TAIL(deliver_info_list, deliver_info);
fanout++;
addr->deliveries_transit++;
if (dest_link->link_type == QD_LINK_ROUTER)
core->deliveries_transit++;
}
}
qd_bitmask_free(conn_set);
}
if (!exclude_inprocess) {
//
// Forward to in-process subscribers
//
qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
while (sub) {
qdr_forward_to_subscriber_CT(core, sub, in_delivery, msg, receive_complete);
fanout++;
addr->deliveries_to_container++;
sub = DEQ_NEXT(sub);
}
}
qdr_forward_deliver_info_t *deliver_info = DEQ_HEAD(deliver_info_list);
while (deliver_info) {
qdr_forward_deliver_CT(core, deliver_info->out_link, deliver_info->out_dlv);
DEQ_REMOVE_HEAD(deliver_info_list);
free_qdr_forward_deliver_info_t(deliver_info);
deliver_info = DEQ_HEAD(deliver_info_list);
}
return fanout;
}