in src/router_core/connections.c [791:996]
void qdr_link_cleanup_deliveries_CT(qdr_core_t *core, qdr_connection_t *conn, qdr_link_t *link, bool on_shutdown)
{
//
// Clean up the lists of deliveries on this link
//
qdr_delivery_ref_list_t updated_deliveries;
qdr_delivery_list_t undelivered;
qdr_delivery_list_t unsettled;
qdr_delivery_list_t settled;
sys_mutex_lock(conn->work_lock);
DEQ_MOVE(link->updated_deliveries, updated_deliveries);
DEQ_MOVE(link->undelivered, undelivered);
qdr_delivery_t *d = DEQ_HEAD(undelivered);
while (d) {
assert(d->where == QDR_DELIVERY_IN_UNDELIVERED);
if (d->presettled)
core->dropped_presettled_deliveries++;
d->where = QDR_DELIVERY_NOWHERE;
if (on_shutdown)
d->tracking_addr = 0;
qdr_link_work_release(d->link_work);
d->link_work = 0;
d = DEQ_NEXT(d);
}
DEQ_MOVE(link->unsettled, unsettled);
d = DEQ_HEAD(unsettled);
while (d) {
assert(d->where == QDR_DELIVERY_IN_UNSETTLED);
d->where = QDR_DELIVERY_NOWHERE;
qdr_link_work_release(d->link_work);
d->link_work = 0;
if (on_shutdown)
d->tracking_addr = 0;
d = DEQ_NEXT(d);
}
DEQ_MOVE(link->settled, settled);
d = DEQ_HEAD(settled);
while (d) {
assert(d->where == QDR_DELIVERY_IN_SETTLED);
d->where = QDR_DELIVERY_NOWHERE;
qdr_link_work_release(d->link_work);
d->link_work = 0;
if (on_shutdown)
d->tracking_addr = 0;
d = DEQ_NEXT(d);
}
sys_mutex_unlock(conn->work_lock);
//
// Free all the 'updated' references
//
qdr_delivery_ref_t *ref = DEQ_HEAD(updated_deliveries);
while (ref) {
//
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
qdr_delivery_increment_counters_CT(core, ref->dlv);
qd_nullify_safe_ptr(&ref->dlv->link_sp);
//
// Now our reference
//
qdr_delivery_decref_CT(core, ref->dlv, "qdr_link_cleanup_deliveries_CT - remove from updated list");
qdr_del_delivery_ref(&updated_deliveries, ref);
ref = DEQ_HEAD(updated_deliveries);
}
//
// Free the undelivered deliveries. If this is an incoming link, the
// undelivereds can simply be destroyed. If it's an outgoing link, the
// undelivereds' peer deliveries need to be released.
//
qdr_delivery_t *dlv = DEQ_HEAD(undelivered);
qdr_delivery_t *peer;
while (dlv) {
DEQ_REMOVE_HEAD(undelivered);
// expect: an inbound undelivered multicast should
// have no peers (has not been forwarded yet)
assert(dlv->multicast
? qdr_delivery_peer_count_CT(dlv) == 0
: true);
peer = qdr_delivery_first_peer_CT(dlv);
while (peer) {
if (peer->multicast) {
//
// dlv is outgoing mcast - tell its incoming peer that it has
// been released and settled. This will unlink these peers.
//
qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, PN_RELEASED, true);
}
else {
qdr_delivery_release_CT(core, peer);
qdr_delivery_unlink_peers_CT(core, dlv, peer);
}
peer = qdr_delivery_next_peer_CT(dlv);
}
//
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
qdr_delivery_increment_counters_CT(core, dlv);
qd_nullify_safe_ptr(&dlv->link_sp);
//
// Now the undelivered-list reference
//
qdr_delivery_decref_CT(core, dlv, "qdr_link_cleanup_deliveries_CT - remove from undelivered list");
dlv = DEQ_HEAD(undelivered);
}
//
// Free the unsettled deliveries.
//
dlv = DEQ_HEAD(unsettled);
while (dlv) {
DEQ_REMOVE_HEAD(unsettled);
if (dlv->tracking_addr) {
dlv->tracking_addr->outstanding_deliveries[dlv->tracking_addr_bit]--;
dlv->tracking_addr->tracked_deliveries--;
if (dlv->tracking_addr->tracked_deliveries == 0)
qdr_check_addr_CT(core, dlv->tracking_addr);
dlv->tracking_addr = 0;
}
if (!qdr_delivery_receive_complete(dlv)) {
qdr_delivery_set_aborted(dlv);
qdr_delivery_continue_peers_CT(core, dlv, false);
}
if (dlv->multicast) {
//
// forward settlement
//
qdr_delivery_mcast_inbound_update_CT(core, dlv,
PN_MODIFIED,
true); // true == settled
} else {
peer = qdr_delivery_first_peer_CT(dlv);
while (peer) {
if (peer->multicast) {
//
// peer is incoming multicast and dlv is one of its corresponding
// outgoing deliveries. This will unlink these peers.
//
qdr_delivery_mcast_outbound_update_CT(core, peer, dlv, PN_MODIFIED, true);
} else {
if (link->link_direction == QD_OUTGOING)
qdr_delivery_failed_CT(core, peer);
qdr_delivery_unlink_peers_CT(core, dlv, peer);
}
peer = qdr_delivery_next_peer_CT(dlv);
}
}
//
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
qdr_delivery_increment_counters_CT(core, dlv);
qd_nullify_safe_ptr(&dlv->link_sp);
//
// Now the unsettled-list reference
//
qdr_delivery_decref_CT(core, dlv, "qdr_link_cleanup_deliveries_CT - remove from unsettled list");
dlv = DEQ_HEAD(unsettled);
}
//Free/unlink/decref the settled deliveries.
dlv = DEQ_HEAD(settled);
while (dlv) {
DEQ_REMOVE_HEAD(settled);
if (!qdr_delivery_receive_complete(dlv)) {
qdr_delivery_set_aborted(dlv);
qdr_delivery_continue_peers_CT(core, dlv, false);
}
peer = qdr_delivery_first_peer_CT(dlv);
qdr_delivery_t *next_peer = 0;
while (peer) {
next_peer = qdr_delivery_next_peer_CT(dlv);
qdr_delivery_unlink_peers_CT(core, dlv, peer);
peer = next_peer;
}
//
// Updates global and link level delivery counters like presettled_deliveries, accepted_deliveries, released_deliveries etc
//
qdr_delivery_increment_counters_CT(core, dlv);
qd_nullify_safe_ptr(&dlv->link_sp);
// This decref is for the removing the delivery from the settled list
qdr_delivery_decref_CT(core, dlv, "qdr_link_cleanup_deliveries_CT - remove from settled list");
dlv = DEQ_HEAD(settled);
}
}