in src/router_core/exchange_bindings.c [121:273]
static void write_config_exchange_map(qdr_exchange_t *ex,
qd_composed_field_t *body);
static qdr_exchange_t *find_exchange(qdr_core_t *core,
qd_iterator_t *identity,
qd_iterator_t *name);
static qdr_binding_t *find_binding(qdr_core_t *core,
qd_iterator_t *identity,
qd_iterator_t *name);
static void write_config_exchange_list(qdr_exchange_t *ex,
qdr_query_t *query);
static qdr_binding_t *qdr_binding(qdr_exchange_t *ex,
qd_iterator_t *name,
qd_iterator_t *key,
qd_iterator_t *next_hop,
int phase);
static void write_config_binding_map(qdr_binding_t *binding,
qd_composed_field_t *body);
static qdr_binding_t *find_binding(qdr_core_t *core,
qd_iterator_t *identity,
qd_iterator_t *name);
static void qdr_binding_free(qdr_binding_t *b);
static void write_config_binding_list(qdr_binding_t *binding,
qdr_query_t *query);
static qdr_binding_t *get_binding_at_index(qdr_core_t *core,
int index);
static next_hop_t *next_hop(qdr_exchange_t *ex,
qd_iterator_t *address,
int phase);
static void next_hop_release(next_hop_t *next_hop);
static next_hop_t *find_next_hop(qdr_exchange_t *ex,
qd_iterator_t *address,
int phase);
static bool gather_next_hops(void *handle,
const char *pattern,
void *payload);
static int send_message(qdr_core_t *core,
next_hop_t *next_hop,
qd_message_t *msg,
qdr_delivery_t *in_delivery,
bool exclude_inprocess,
bool control);
//
// The Exchange Forwarder
//
int qdr_forward_exchange_CT(qdr_core_t *core,
qdr_address_t *addr,
qd_message_t *msg,
qdr_delivery_t *in_delivery,
bool exclude_inprocess,
bool control)
{
int forwarded = 0;
const bool presettled = !!in_delivery ? in_delivery->settled : true;
qdr_exchange_t *ex = addr->exchange;
assert(ex);
ex->msgs_received += 1;
// honor the disposition for the exchange address (this may not be right??)
if (ex->old_forwarder)
forwarded = ex->old_forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);
// @TODO(kgiusti): de-duplicate this code (cut & paste from multicast
// forwarder)
//
// If the delivery is not presettled, set the settled flag for forwarding so all
// outgoing deliveries will be presettled.
//
// NOTE: This is the only multicast mode currently supported. Others will likely be
// implemented in the future.
//
if (!presettled)
in_delivery->settled = true;
qd_iterator_t *subject = qd_message_check_depth(msg, QD_DEPTH_PROPERTIES) == QD_MESSAGE_DEPTH_OK
? qd_message_field_iterator(msg, QD_FIELD_SUBJECT)
: NULL;
next_hop_list_t transmit_list;
DEQ_INIT(transmit_list);
if (subject) {
// find all matching bindings and build a list of their next hops
qd_parse_tree_search(ex->parse_tree, subject, gather_next_hops, &transmit_list);
qd_iterator_free(subject);
}
// if there are valid next hops then we're routing this message based on an
// entirely new destination address. We need to reset the origin and the
// excluded link flags in the delivery. We also need to reset the trace
// annotations and ingress field in the message. This is done because it is
// possible that the next hop is reached via the same link/router this
// message arrived from.
// @TODO(kgiusti) - loop detection
if (DEQ_SIZE(transmit_list) > 0 || ex->alternate) {
if (in_delivery) {
in_delivery->origin = 0;
qd_bitmask_free(in_delivery->link_exclusion);
in_delivery->link_exclusion = 0;
}
qd_message_reset_trace_annotation(msg);
qd_message_reset_ingress_router_annotation(msg);
}
next_hop_t *next_hop = DEQ_HEAD(transmit_list);
while (next_hop) {
DEQ_REMOVE_N(transmit_list, transmit_list, next_hop);
next_hop->on_xmit_list = false;
assert(next_hop->qdr_addr);
// @TODO(kgiusti) - non-recursive handling of next hop if it is an exchange
forwarded += send_message(ex->core, next_hop, msg, in_delivery, exclude_inprocess, control);
next_hop = DEQ_HEAD(transmit_list);
}
if (forwarded == 0 && ex->alternate) {
forwarded = send_message(ex->core, ex->alternate, msg, in_delivery, exclude_inprocess, control);
if (forwarded) {
ex->msgs_alternate += 1;
}
}
// @TODO(kgiusti): de-duplicate the settlement code (cut & paste from
// multicast forwarder)
if (forwarded == 0) {
ex->msgs_dropped += 1;
if (!presettled) {
//
// The delivery was not originally presettled and it was not
// forwarded to any destinations, return it to its original
// unsettled state.
//
in_delivery->settled = false;
}
} else {
ex->msgs_routed += 1;
if (in_delivery && !presettled) {
//
// The delivery was not presettled and it was forwarded to at least
// one destination. Accept and settle the delivery only if the
// entire delivery has been received.
//
const bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery));
if (receive_complete) {
in_delivery->disposition = PN_ACCEPTED;
qdr_delivery_push_CT(core, in_delivery);
}
}
}
return forwarded;
}