void qdr_link_cleanup_deliveries_CT()

in src/router_core/connections.c [791:996]


void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, bool on_shutdown)
{
    //
    // Clean up the lists of deliveries on this link
    //
    qdr_delivery_ref_list_t updated_deliveries;
    qdr_delivery_list_t     undelivered;
    qdr_delivery_list_t     unsettled;
    qdr_delivery_list_t     settled;

    sys_mutex_lock(conn->work_lock);
    DEQ_MOVE(link->updated_deliveries, updated_deliveries);

    DEQ_MOVE(link->undelivered, undelivered);
    qdr_delivery_t *d = DEQ_HEAD(undelivered);
    while (d) {
        assert(d->where == QDR_DELIVERY_IN_UNDELIVERED);
        if (d->presettled)
            core->dropped_presettled_deliveries++;
        d->where = QDR_DELIVERY_NOWHERE;
        if (on_shutdown)
            d->tracking_addr = 0;
        qdr_link_work_release(d->link_work);
        d->link_work = 0;
        d = DEQ_NEXT(d);
    }

    DEQ_MOVE(link->unsettled, unsettled);
    d = DEQ_HEAD(unsettled);
    while (d) {
        assert(d->where == QDR_DELIVERY_IN_UNSETTLED);
        d->where = QDR_DELIVERY_NOWHERE;
        qdr_link_work_release(d->link_work);
        d->link_work = 0;
        if (on_shutdown)
            d->tracking_addr = 0;
        d = DEQ_NEXT(d);
    }

    DEQ_MOVE(link->settled, settled);
    d = DEQ_HEAD(settled);
    while (d) {
        assert(d->where == QDR_DELIVERY_IN_SETTLED);
        d->where = QDR_DELIVERY_NOWHERE;
        qdr_link_work_release(d->link_work);
        d->link_work = 0;
        if (on_shutdown)
            d->tracking_addr = 0;
        d = DEQ_NEXT(d);
    }
    sys_mutex_unlock(conn->work_lock);

    //
    // Free all the 'updated' references
    //
    qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
    while (ref) {
        //
        // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
        //
        qdr_delivery_increment_counters_CT(core, ref->dlv);
        qd_nullify_safe_ptr(&ref->dlv->link_sp);
        //
        // Now our reference
        //
        qdr_delivery_decref_CT(core, ref->dlv, "qdr_link_cleanup_deliveries_CT - remove from updated list");
        qdr_del_delivery_ref(&updated_deliveries, ref);
        ref = DEQ_HEAD(updated_deliveries);
    }

    //
    // Free the undelivered deliveries.  If this is an incoming link, the
    // undelivereds can simply be destroyed.  If it's an outgoing link, the
    // undelivereds' peer deliveries need to be released.
    //
    qdr_delivery_t *dlv = DEQ_HEAD(undelivered);
    qdr_delivery_t *peer;
    while (dlv) {
        DEQ_REMOVE_HEAD(undelivered);

        // expect: an inbound undelivered multicast should
        // have no peers (has not been forwarded yet)
        assert(dlv->multicast
               ? qdr_delivery_peer_count_CT(dlv) == 0
               : true);

        peer = qdr_delivery_first_peer_CT(dlv);
        while (peer) {
            if (peer->multicast) {
                //
                // dlv is outgoing mcast - tell its incoming peer that it has
                // been released and settled.  This will unlink these peers.
                //
                qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, PN_RELEASED, true);
            }
            else {
                qdr_delivery_release_CT(core, peer);
                qdr_delivery_unlink_peers_CT(core, dlv, peer);
            }
            peer = qdr_delivery_next_peer_CT(dlv);
        }

        //
        // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
        //
        qdr_delivery_increment_counters_CT(core, dlv);
        qd_nullify_safe_ptr(&dlv->link_sp);

        //
        // Now the undelivered-list reference
        //
        qdr_delivery_decref_CT(core, dlv, "qdr_link_cleanup_deliveries_CT - remove from undelivered list");

        dlv = DEQ_HEAD(undelivered);
    }

    //
    // Free the unsettled deliveries.
    //
    dlv = DEQ_HEAD(unsettled);
    while (dlv) {
        DEQ_REMOVE_HEAD(unsettled);

        if (dlv->tracking_addr) {
            dlv->tracking_addr->outstanding_deliveries[dlv->tracking_addr_bit]--;
            dlv->tracking_addr->tracked_deliveries--;

            if (dlv->tracking_addr->tracked_deliveries == 0)
                qdr_check_addr_CT(core, dlv->tracking_addr);

            dlv->tracking_addr = 0;
        }

        if (!qdr_delivery_receive_complete(dlv)) {
            qdr_delivery_set_aborted(dlv);
            qdr_delivery_continue_peers_CT(core, dlv, false);
        }

        if (dlv->multicast) {
            //
            // forward settlement
            //
            qdr_delivery_mcast_inbound_update_CT(core, dlv,
                                                 PN_MODIFIED,
                                                 true);  // true == settled
        } else {
            peer = qdr_delivery_first_peer_CT(dlv);
            while (peer) {
                if (peer->multicast) {
                    //
                    // peer is incoming multicast and dlv is one of its corresponding
                    // outgoing deliveries.  This will unlink these peers.
                    //
                    qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, PN_MODIFIED, true);
                } else {
                    if (link->link_direction == QD_OUTGOING)
                        qdr_delivery_failed_CT(core, peer);
                    qdr_delivery_unlink_peers_CT(core, dlv, peer);
                }
                peer = qdr_delivery_next_peer_CT(dlv);
            }
        }

        //
        // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
        //
        qdr_delivery_increment_counters_CT(core, dlv);
        qd_nullify_safe_ptr(&dlv->link_sp);

        //
        // Now the unsettled-list reference
        //
        qdr_delivery_decref_CT(core, dlv, "qdr_link_cleanup_deliveries_CT - remove from unsettled list");

        dlv = DEQ_HEAD(unsettled);
    }

    //Free/unlink/decref the settled deliveries.
    dlv = DEQ_HEAD(settled);
    while (dlv) {
        DEQ_REMOVE_HEAD(settled);

        if (!qdr_delivery_receive_complete(dlv)) {
            qdr_delivery_set_aborted(dlv);
            qdr_delivery_continue_peers_CT(core, dlv, false);
        }

        peer = qdr_delivery_first_peer_CT(dlv);
        qdr_delivery_t *next_peer = 0;
        while (peer) {
            next_peer = qdr_delivery_next_peer_CT(dlv);
            qdr_delivery_unlink_peers_CT(core, dlv, peer);
            peer = next_peer;
        }

        //
        // Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
        //
        qdr_delivery_increment_counters_CT(core, dlv);
        qd_nullify_safe_ptr(&dlv->link_sp);

        // This decref is for the removing the delivery from the settled list
        qdr_delivery_decref_CT(core, dlv, "qdr_link_cleanup_deliveries_CT - remove from settled list");
        dlv = DEQ_HEAD(settled);
    }
}