static void qdr_link_deliver_CT()

in src/router_core/transfer.c [733:877]


static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
    if (discard)
        return;

    qdr_delivery_t *dlv  = action->args.delivery.delivery;
    bool            more = action->args.delivery.more;
    qdr_link_t     *link = qdr_delivery_link(dlv);

    if (!link)
        return;
    if (link->conn)
        link->conn->last_delivery_time = qdr_core_uptime_ticks(core);

    link->total_deliveries++;

    if (link->link_type == QD_LINK_ENDPOINT && !link->fallback)
        core->deliveries_ingress++;

    //
    // Record the ingress time so we can track the age of this delivery.
    //
    dlv->ingress_time = qdr_core_uptime_ticks(core);

    //
    // If the link is an edge link, mark this delivery as via-edge
    //
    dlv->via_edge = link->edge;

    //
    // If this link has a core_endpoint, direct deliveries to that endpoint.
    //
    if (!!link->core_endpoint) {
        qdrc_endpoint_do_deliver_CT(core, link->core_endpoint, dlv);
        return;
    }

    if (link->connected_link) {
        //
        // If this is an attach-routed link, put the delivery directly onto the peer link
        //
        qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, link->connected_link, dlv->msg);

        //
        // Copy the delivery tag.  For link-routing, the delivery tag must be preserved.
        //
        peer->tag_length = action->args.delivery.tag_length;
        memcpy(peer->tag, action->args.delivery.tag, peer->tag_length);

        qdr_forward_deliver_CT(core, link->connected_link, peer);

        if (!dlv->settled) {
            DEQ_INSERT_TAIL(link->unsettled, dlv);
            dlv->where = QDR_DELIVERY_IN_UNSETTLED;
            qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery transfer:  qdr_link_deliver_CT: action-list -> unsettled-list", DLV_ARGS(dlv));
        } else {
            //
            // If the delivery is settled, decrement the ref_count on the delivery.
            // This count was the owned-by-action count.
            //
            qdr_delivery_decref_CT(core, dlv, "qdr_link_deliver_CT - removed from action");
        }
        return;
    }

    //
    // NOTE: The link->undelivered list does not need to be protected by the
    //       connection's work lock for incoming links.  This protection is only
    //       needed for outgoing links.
    //

    if (DEQ_IS_EMPTY(link->undelivered)) {
        qdr_link_ref_t *temp_rlink = 0;
        qdr_address_t *addr = link->owning_addr;
        if (!addr && dlv->to_addr) {
            qdr_connection_t *conn = link->conn;
            if (conn && conn->tenant_space)
                qd_iterator_annotate_space(dlv->to_addr, conn->tenant_space, conn->tenant_space_len);
            qd_hash_retrieve(core->addr_hash, dlv->to_addr, (void**) &addr);

            if (!addr) {
                //
                // This is an anonymous delivery but the address that it wants sent to is
                // not in this router's address table. We will send this delivery up the
                // anonymous link to the interior router (if this is an edge router).
                // Only edge routers have a non null core->edge_conn_addr
                //
                if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION) {
                    qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
                    if (sender_address) {
                        addr = sender_address;
                    }
                }
            }
            else {
                //
                // (core->edge_conn_addr is non-zero ONLY on edge routers. So there is no need to check if the
                // core->router_mode is edge.
                //
                // The connection on which the delivery arrived should not be QDR_ROLE_EDGE_CONNECTION because
                // we do not want to send it back over the same connections
                //
                if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION && qdr_is_addr_treatment_multicast(addr)) {
                    qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
                    if (sender_address && sender_address != addr) {
                        qdr_link_ref_t *sender_rlink = DEQ_HEAD(sender_address->rlinks);
                        if (sender_rlink) {
                            temp_rlink = new_qdr_link_ref_t();
                            DEQ_ITEM_INIT(temp_rlink);
                            temp_rlink->link = sender_rlink->link;
                            DEQ_INSERT_TAIL(addr->rlinks, temp_rlink);
                        }
                    }
                }
            }
        }

        //
        // Deal with any delivery restrictions for this address.
        //
        if (addr && addr->router_control_only && link->link_type != QD_LINK_CONTROL) {
        	qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Link forward:  qdr_link_deliver_CT: released dlv", DLV_ARGS(dlv));
            qdr_delivery_release_CT(core, dlv);
            qdr_link_issue_credit_CT(core, link, 1, false);
            qdr_delivery_decref_CT(core, dlv, "qdr_link_deliver_CT - removed from action on restricted access");
        } else {
            //
            // Give the action reference to the qdr_link_forward function. Don't decref/incref.
            //
            qdr_link_forward_CT(core, link, dlv, addr, more);
        }

        if (addr && temp_rlink) {
            DEQ_REMOVE(addr->rlinks, temp_rlink);
            free_qdr_link_ref_t(temp_rlink);
        }
    } else {
        //
        // Take the action reference and use it for undelivered.  Don't decref/incref.
        //
        DEQ_INSERT_TAIL(link->undelivered, dlv);
        dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
        qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery transfer:  qdr_link_deliver_CT: action-list -> undelivered-list", DLV_ARGS(dlv));
    }
}