in src/router_core/transfer.c [515:730]
static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr, bool more)
{
qdr_link_t *dlv_link = qdr_delivery_link(dlv);
assert(dlv_link == link);
if (!dlv_link)
return;
if (addr
&& addr == link->owning_addr
&& qdr_addr_path_count_CT(addr) == 0
&& (link->fallback || qdr_addr_path_count_CT(addr->fallback) == 0)) {
//
// We are trying to forward a delivery on an address that has no outbound paths
// AND the incoming link is targeted (not anonymous).
//
// We shall release the delivery (it is currently undeliverable). Since
// there are no receivers we will try to drain credit to prevent the
// sender from attempting to send more to this address.
//
if (dlv->settled) {
// Increment the presettled_dropped_deliveries on the in_link
link->dropped_presettled_deliveries++;
if (dlv_link->link_type == QD_LINK_ENDPOINT)
core->dropped_presettled_deliveries++;
}
//
// Note if the message was pre-settled we still call the
// qdr_delivery_release_CT so if this delivery is multi-frame we can
// restart receiving the delivery in case it is stalled. Note that
// messages will not *actually* be released in this case because these
// are presettled messages.
//
qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery forward: qdr_link_forward_CT (qdr_addr_path_count_CT(addr) == 0): released dlv", DLV_ARGS(dlv));
qdr_delivery_release_CT(core, dlv);
//
// Credit update: since this is a targeted link to an address for which
// there is no consumers then do not replenish credit - drain instead.
// However edge is a special snowflake which always has credit available.
//
if (link->edge) {
qdr_link_issue_credit_CT(core, link, 1, false);
} else {
qdr_link_issue_credit_CT(core, link, 0, true); // drain
link->credit_pending++;
}
qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)");
return;
}
int fanout = 0;
dlv->multicast = qdr_is_addr_treatment_multicast(addr);
if (addr) {
fanout = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
if (link->link_type != QD_LINK_CONTROL && link->link_type != QD_LINK_ROUTER) {
if (!link->fallback)
addr->deliveries_ingress++;
if (qdr_connection_route_container(link->conn)) {
addr->deliveries_ingress_route_container++;
core->deliveries_ingress_route_container++;
}
}
} else {
//
// There is no address that we can send this delivery to, which means
// the addr was not found in our hash table. This can be because there
// were no receivers or because the address was not defined in the
// config file.
//
qd_address_treatment_t trt = core->qd->default_treatment;
if (dlv->to_addr) {
qdr_address_config_t *ignore = 0;
trt = qdr_treatment_for_address_hash_with_default_CT(core,
dlv->to_addr,
trt,
&ignore);
}
if (trt == QD_TREATMENT_UNAVAILABLE) {
//
// The treatment for these addresses is set to be unavailable, we
// stop trying to forward it. If the link is a locally attached client
// we reject the message if the link is not anonymous as per the
// documentation of the router's defaultTreatment=unavailable. We
// simply release it for other link types as the message did have a
// destination at some point (it was forwarded to this router after
// all) - the loss of the destination may be temporary.
//
if (link->link_type == QD_LINK_ENDPOINT) {
qdr_error_t *error = qdr_error(QD_AMQP_COND_NOT_FOUND, "Deliveries cannot be sent to an unavailable address");
qdr_delivery_reject_CT(core, dlv, error);
if (qdr_link_is_anonymous(link)) {
qdr_link_issue_credit_CT(core, link, 1, false);
} else {
// cannot forward on this targeted link. withhold credit and drain
qdr_link_issue_credit_CT(core, link, 0, true);
}
} else {
qdr_delivery_release_CT(core, dlv);
qdr_link_issue_credit_CT(core, link, 1, false);
}
//
// We will not detach this link because this could be anonymous sender. We don't know
// which address the sender will be sending to next
// If this was not an anonymous sender, the initial attach would have been rejected if the target address was unavailable.
//
qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (treatment unavailable)");
return;
}
}
//
// If the anonymous delivery could not be sent anywhere (fanout = 0) and it is not multicasted, try sending it over
// the anonymous link.
//
if (fanout == 0 && !dlv->multicast && link->owning_addr == 0 && dlv->to_addr != 0) {
if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION) {
qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
if (sender_address && sender_address != addr)
fanout += qdr_forward_message_CT(core, sender_address, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
}
}
//
// If the fanout is still zero, check to see if there is a fallback address and
// route via the fallback if present. Don't do fallback forwarding if this link is
// itself associated with a fallback destination.
//
if (fanout == 0 && !!addr && !!addr->fallback && !link->fallback) {
const char *key = (const char*) qd_hash_key_by_handle(addr->fallback->hash_handle);
qd_message_set_to_override_annotation(dlv->msg, key + 2);
qd_message_set_phase_annotation(dlv->msg, key[1] - '0');
fanout = qdr_forward_message_CT(core, addr->fallback, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
if (fanout > 0) {
addr->deliveries_redirected++;
core->deliveries_redirected++;
}
}
if (fanout == 0) {
//
// Message was not delivered, drop the delivery.
//
// If the delivery is not settled, release it.
//
if (!dlv->settled) {
qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery forward: qdr_link_forward_CT(fanout == 0): released dlv", DLV_ARGS(dlv));
qdr_delivery_release_CT(core, dlv);
}
else {
link->dropped_presettled_deliveries++;
if (dlv_link->link_type == QD_LINK_ENDPOINT)
core->dropped_presettled_deliveries++;
}
//
// Decrementing the delivery ref count for the action
//
qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (1)");
qdr_link_issue_credit_CT(core, link, 1, false);
} else if (fanout > 0) {
if (dlv->settled) {
//
// The delivery is settled. Keep it off the unsettled list and issue
// replacement credit for it now.
//
qdr_link_issue_credit_CT(core, link, 1, false);
if (!more) {
//
// This decref is for the action ref
//
qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (2)");
}
else {
//
// The message is still coming through since receive_complete is false. We have to put this delivery in the settled list.
// We need to do this because we have linked this delivery to a peer.
// If this connection goes down, we will have to unlink peer so that peer knows that its peer is not-existent anymore
// and need to tell the other side that the message has been aborted.
//
//
// Again, don't bother decrementing then incrementing the ref_count, we are still using the action ref count
//
DEQ_INSERT_TAIL(link->settled, dlv);
dlv->where = QDR_DELIVERY_IN_SETTLED;
qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery transfer: qdr_link_forward_CT: action-list -> settled-list", DLV_ARGS(dlv));
}
} else {
//
// Again, don't bother decrementing then incrementing the ref_count
//
DEQ_INSERT_TAIL(link->unsettled, dlv);
dlv->where = QDR_DELIVERY_IN_UNSETTLED;
qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery transfer: qdr_link_forward_CT: action-list -> unsettled-list", DLV_ARGS(dlv));
//
// If the delivery was received on an inter-router link, issue the credit
// now. We don't want to tie inter-router link flow control to unsettled
// deliveries because it increases the risk of credit starvation if there
// are many addresses sharing the link.
//
if (link->link_type == QD_LINK_CONTROL || link->link_type == QD_LINK_ROUTER || link->edge)
qdr_link_issue_credit_CT(core, link, 1, false);
}
}
}