static void qdr_link_forward_CT()

in src/router_core/transfer.c [515:730]


static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv, qdr_address_t *addr, bool more)
{
    qdr_link_t *dlv_link = qdr_delivery_link(dlv);

    assert(dlv_link == link);

    if (!dlv_link)
        return;

    if (addr
        && addr == link->owning_addr
        && qdr_addr_path_count_CT(addr) == 0
        && (link->fallback || qdr_addr_path_count_CT(addr->fallback) == 0)) {
        //
        // We are trying to forward a delivery on an address that has no outbound paths
        // AND the incoming link is targeted (not anonymous).
        //
        // We shall release the delivery (it is currently undeliverable). Since
        // there are no receivers we will try to drain credit to prevent the
        // sender from attempting to send more to this address.
        //
        if (dlv->settled) {
            // Increment the presettled_dropped_deliveries on the in_link
            link->dropped_presettled_deliveries++;
            if (dlv_link->link_type == QD_LINK_ENDPOINT)
                core->dropped_presettled_deliveries++;
        }

        //
        // Note if the message was pre-settled we still call the
        // qdr_delivery_release_CT so if this delivery is multi-frame we can
        // restart receiving the delivery in case it is stalled. Note that
        // messages will not *actually* be released in this case because these
        // are presettled messages.
        //
        qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery forward:  qdr_link_forward_CT (qdr_addr_path_count_CT(addr) == 0): released dlv", DLV_ARGS(dlv));
        qdr_delivery_release_CT(core, dlv);

        //
        // Credit update: since this is a targeted link to an address for which
        // there is no consumers then do not replenish credit - drain instead.
        // However edge is a special snowflake which always has credit available.
        //
        if (link->edge) {
            qdr_link_issue_credit_CT(core, link, 1, false);
        } else {
            qdr_link_issue_credit_CT(core, link, 0, true);  // drain
            link->credit_pending++;
        }

        qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (no path)");
        return;
    }

    int fanout = 0;

    dlv->multicast = qdr_is_addr_treatment_multicast(addr);

    if (addr) {
        fanout = qdr_forward_message_CT(core, addr, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
        if (link->link_type != QD_LINK_CONTROL && link->link_type != QD_LINK_ROUTER) {
            if (!link->fallback)
                addr->deliveries_ingress++;

            if (qdr_connection_route_container(link->conn)) {
                addr->deliveries_ingress_route_container++;
                core->deliveries_ingress_route_container++;
            }

        }
    } else {
        //
        // There is no address that we can send this delivery to, which means
        // the addr was not found in our hash table. This can be because there
        // were no receivers or because the address was not defined in the
        // config file.
        //

        qd_address_treatment_t trt = core->qd->default_treatment;
        if (dlv->to_addr) {
            qdr_address_config_t *ignore = 0;
            trt = qdr_treatment_for_address_hash_with_default_CT(core,
                                                                 dlv->to_addr,
                                                                 trt,
                                                                 &ignore);
        }

        if (trt == QD_TREATMENT_UNAVAILABLE) {
            //
            // The treatment for these addresses is set to be unavailable, we
            // stop trying to forward it.  If the link is a locally attached client
            // we reject the message if the link is not anonymous as per the
            // documentation of the router's defaultTreatment=unavailable.  We
            // simply release it for other link types as the message did have a
            // destination at some point (it was forwarded to this router after
            // all) - the loss of the destination may be temporary.
            //
            if (link->link_type == QD_LINK_ENDPOINT) {
                qdr_error_t *error = qdr_error(QD_AMQP_COND_NOT_FOUND, "Deliveries cannot be sent to an unavailable address");
                qdr_delivery_reject_CT(core, dlv, error);
                if (qdr_link_is_anonymous(link)) {
                    qdr_link_issue_credit_CT(core, link, 1, false);
                } else {
                    // cannot forward on this targeted link.  withhold credit and drain
                    qdr_link_issue_credit_CT(core, link, 0, true);
                }
            } else {
                qdr_delivery_release_CT(core, dlv);
                qdr_link_issue_credit_CT(core, link, 1, false);
            }
            //
            // We will not detach this link because this could be anonymous sender. We don't know
            // which address the sender will be sending to next
            // If this was not an anonymous sender, the initial attach would have been rejected if the target address was unavailable.
            //
            qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (treatment unavailable)");
            return;
        }
    }

    //
    // If the anonymous delivery could not be sent anywhere (fanout = 0) and it is not multicasted, try sending it over
    // the anonymous link.
    //
    if (fanout == 0 && !dlv->multicast && link->owning_addr == 0 && dlv->to_addr != 0) {
        if (core->edge_conn_addr && link->conn->role != QDR_ROLE_EDGE_CONNECTION) {
            qdr_address_t *sender_address = core->edge_conn_addr(core->edge_context);
            if (sender_address && sender_address != addr)
                fanout += qdr_forward_message_CT(core, sender_address, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
        }
    }

    //
    // If the fanout is still zero, check to see if there is a fallback address and
    // route via the fallback if present.  Don't do fallback forwarding if this link is
    // itself associated with a fallback destination.
    //
    if (fanout == 0 && !!addr && !!addr->fallback && !link->fallback) {
        const char *key = (const char*) qd_hash_key_by_handle(addr->fallback->hash_handle);
        qd_message_set_to_override_annotation(dlv->msg, key + 2);
        qd_message_set_phase_annotation(dlv->msg, key[1] - '0');
        fanout = qdr_forward_message_CT(core, addr->fallback, dlv->msg, dlv, false, link->link_type == QD_LINK_CONTROL);
        if (fanout > 0) {
            addr->deliveries_redirected++;
            core->deliveries_redirected++;
        }
    }

    if (fanout == 0) {
        //
        // Message was not delivered, drop the delivery.
        //
        // If the delivery is not settled, release it.
        //
        if (!dlv->settled) {
        	qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery forward:  qdr_link_forward_CT(fanout == 0): released dlv", DLV_ARGS(dlv));
            qdr_delivery_release_CT(core, dlv);
        }
        else {
            link->dropped_presettled_deliveries++;
            if (dlv_link->link_type == QD_LINK_ENDPOINT)
                core->dropped_presettled_deliveries++;
        }

        //
        // Decrementing the delivery ref count for the action
        //
        qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (1)");
        qdr_link_issue_credit_CT(core, link, 1, false);
    } else if (fanout > 0) {
        if (dlv->settled) {
            //
            // The delivery is settled.  Keep it off the unsettled list and issue
            // replacement credit for it now.
            //
            qdr_link_issue_credit_CT(core, link, 1, false);
            if (!more) {
                //
                // This decref is for the action ref
                //
                qdr_delivery_decref_CT(core, dlv, "qdr_link_forward_CT - removed from action (2)");
            }
            else {
                //
                // The message is still coming through since receive_complete is false. We have to put this delivery in the settled list.
                // We need to do this because we have linked this delivery to a peer.
                // If this connection goes down, we will have to unlink peer so that peer knows that its peer is not-existent anymore
                // and need to tell the other side that the message has been aborted.
                //

                //
                // Again, don't bother decrementing then incrementing the ref_count, we are still using the action ref count
                //
                DEQ_INSERT_TAIL(link->settled, dlv);
                dlv->where = QDR_DELIVERY_IN_SETTLED;
                qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery transfer:  qdr_link_forward_CT: action-list -> settled-list", DLV_ARGS(dlv));
            }
        } else {
            //
            // Again, don't bother decrementing then incrementing the ref_count
            //
            DEQ_INSERT_TAIL(link->unsettled, dlv);
            dlv->where = QDR_DELIVERY_IN_UNSETTLED;
            qd_log(core->log, QD_LOG_DEBUG, DLV_FMT" Delivery transfer:  qdr_link_forward_CT: action-list -> unsettled-list", DLV_ARGS(dlv));

            //
            // If the delivery was received on an inter-router link, issue the credit
            // now.  We don't want to tie inter-router link flow control to unsettled
            // deliveries because it increases the risk of credit starvation if there
            // are many addresses sharing the link.
            //
            if (link->link_type == QD_LINK_CONTROL || link->link_type == QD_LINK_ROUTER || link->edge)
                qdr_link_issue_credit_CT(core, link, 1, false);
        }
    }
}