static void write_config_exchange_map()

in src/router_core/exchange_bindings.c [121:273]


static void write_config_exchange_map(qdr_exchange_t      *ex,
                                      qd_composed_field_t *body);
static qdr_exchange_t *find_exchange(qdr_core_t    *core,
                                     qd_iterator_t *identity,
                                     qd_iterator_t *name);
static qdr_binding_t *find_binding(qdr_core_t *core,
                                   qd_iterator_t  *identity,
                                   qd_iterator_t  *name);
static void write_config_exchange_list(qdr_exchange_t *ex,
                                       qdr_query_t    *query);
static qdr_binding_t *qdr_binding(qdr_exchange_t *ex,
                                  qd_iterator_t  *name,
                                  qd_iterator_t  *key,
                                  qd_iterator_t  *next_hop,
                                  int             phase);
static void write_config_binding_map(qdr_binding_t       *binding,
                                     qd_composed_field_t *body);
static qdr_binding_t *find_binding(qdr_core_t    *core,
                                   qd_iterator_t *identity,
                                   qd_iterator_t *name);
static void qdr_binding_free(qdr_binding_t *b);
static void write_config_binding_list(qdr_binding_t *binding,
                                      qdr_query_t   *query);
static qdr_binding_t *get_binding_at_index(qdr_core_t *core,
                                           int         index);
static next_hop_t *next_hop(qdr_exchange_t *ex,
                            qd_iterator_t  *address,
                            int             phase);
static void next_hop_release(next_hop_t *next_hop);
static next_hop_t *find_next_hop(qdr_exchange_t *ex,
                                 qd_iterator_t  *address,
                                 int             phase);
static bool gather_next_hops(void *handle,
                             const char *pattern,
                             void *payload);
static int send_message(qdr_core_t     *core,
                        next_hop_t     *next_hop,
                        qd_message_t   *msg,
                        qdr_delivery_t *in_delivery,
                        bool            exclude_inprocess,
                        bool            control);


//
// The Exchange Forwarder
//
int qdr_forward_exchange_CT(qdr_core_t     *core,
                            qdr_address_t  *addr,
                            qd_message_t   *msg,
                            qdr_delivery_t *in_delivery,
                            bool            exclude_inprocess,
                            bool            control)
{
    int forwarded = 0;
    const bool presettled = !!in_delivery ? in_delivery->settled : true;
    qdr_exchange_t *ex = addr->exchange;
    assert(ex);

    ex->msgs_received += 1;

    // honor the disposition for the exchange address (this may not be right??)
    if (ex->old_forwarder)
        forwarded = ex->old_forwarder->forward_message(core, addr, msg, in_delivery, exclude_inprocess, control);

    // @TODO(kgiusti): de-duplicate this code (cut & paste from multicast
    // forwarder)
    //
    // If the delivery is not presettled, set the settled flag for forwarding so all
    // outgoing deliveries will be presettled.
    //
    // NOTE:  This is the only multicast mode currently supported.  Others will likely be
    //        implemented in the future.
    //
    if (!presettled)
        in_delivery->settled = true;

    qd_iterator_t *subject = qd_message_check_depth(msg, QD_DEPTH_PROPERTIES) == QD_MESSAGE_DEPTH_OK
        ? qd_message_field_iterator(msg, QD_FIELD_SUBJECT)
        : NULL;
    next_hop_list_t transmit_list;
    DEQ_INIT(transmit_list);

    if (subject) {
        // find all matching bindings and build a list of their next hops
        qd_parse_tree_search(ex->parse_tree, subject, gather_next_hops, &transmit_list);
        qd_iterator_free(subject);
    }

    // if there are valid next hops then we're routing this message based on an
    // entirely new destination address.  We need to reset the origin and the
    // excluded link flags in the delivery.  We also need to reset the trace
    // annotations and ingress field in the message. This is done because it is
    // possible that the next hop is reached via the same link/router this
    // message arrived from.
    // @TODO(kgiusti) - loop detection
    if (DEQ_SIZE(transmit_list) > 0 || ex->alternate) {
        if (in_delivery) {
            in_delivery->origin = 0;
            qd_bitmask_free(in_delivery->link_exclusion);
            in_delivery->link_exclusion = 0;
        }

        qd_message_reset_trace_annotation(msg);
        qd_message_reset_ingress_router_annotation(msg);
    }

    next_hop_t *next_hop = DEQ_HEAD(transmit_list);
    while (next_hop) {
        DEQ_REMOVE_N(transmit_list, transmit_list, next_hop);
        next_hop->on_xmit_list = false;
        assert(next_hop->qdr_addr);
        // @TODO(kgiusti) - non-recursive handling of next hop if it is an exchange
        forwarded += send_message(ex->core, next_hop, msg, in_delivery, exclude_inprocess, control);
        next_hop = DEQ_HEAD(transmit_list);
    }

    if (forwarded == 0 && ex->alternate) {
        forwarded = send_message(ex->core, ex->alternate, msg, in_delivery, exclude_inprocess, control);
        if (forwarded) {
            ex->msgs_alternate += 1;
        }
    }

    // @TODO(kgiusti): de-duplicate the settlement code (cut & paste from
    // multicast forwarder)
    if (forwarded == 0) {
        ex->msgs_dropped += 1;
        if (!presettled) {
            //
            // The delivery was not originally presettled and it was not
            // forwarded to any destinations, return it to its original
            // unsettled state.
            //
            in_delivery->settled = false;
        }
    } else {
        ex->msgs_routed += 1;
        if (in_delivery && !presettled) {
            //
            // The delivery was not presettled and it was forwarded to at least
            // one destination.  Accept and settle the delivery only if the
            // entire delivery has been received.
            //
            const bool receive_complete = qd_message_receive_complete(qdr_delivery_message(in_delivery));
            if (receive_complete) {
                in_delivery->disposition = PN_ACCEPTED;
                qdr_delivery_push_CT(core, in_delivery);
            }
        }
    }

    return forwarded;
}