static void qdr_link_react_to_first_attach_CT()

in src/router_core/modules/address_lookup_client/address_lookup_client.c [346:449]


static void qdr_link_react_to_first_attach_CT(qdr_core_t       *core,
                                              qdr_connection_t *conn,
                                              qdr_address_t    *addr,
                                              qdr_link_t       *link,
                                              qd_direction_t    dir,
                                              qdr_terminus_t   *source,  // must free when done
                                              qdr_terminus_t   *target,  // must free when done
                                              bool              link_route,
                                              bool              unavailable,
                                              bool              core_endpoint,
                                              bool              fallback)
{
    link->fallback = fallback;

    if (core_endpoint) {
        qdrc_endpoint_do_bound_attach_CT(core, addr, link, source, target);
        source = target = 0;  // ownership passed to qdrc_endpoint_do_bound_attach_CT
    }
    else if (unavailable && qdr_terminus_is_coordinator(dir == QD_INCOMING ? target : source) && !addr) {
        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true);
    }
    else if (unavailable) {
        qdr_link_outbound_detach_CT(core, link, qdr_error(QD_AMQP_COND_NOT_FOUND, "Node not found"), 0, true);
    }
    else if (!addr) {
        //
        // No route to this destination, reject the link
        //
        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
    }
    else if (link_route) {
        //
        // This is a link-routed destination, forward the attach to the next hop
        //
        qdr_terminus_t *term = dir == QD_INCOMING ? target : source;
        if (qdr_terminus_survives_disconnect(term) && !core->qd->allow_resumable_link_route) {
            qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_INVALID_LINK_EXPIRATION, true);
        } else {
            if (conn->role != QDR_ROLE_INTER_ROUTER && conn->connection_info) {
                link->disambiguated_name = disambiguated_link_name(conn->connection_info, link->name);
            }
            bool success = qdr_forward_attach_CT(core, addr, link, source, target);
            if (success) {
                source = target = 0;  // ownership passed to qdr_forward_attach_CT
            } else {
                qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
            }
        }
    }
    else if (dir == QD_INCOMING && qdr_terminus_is_coordinator(target)) {
        //
        // This target terminus is a coordinator.
        // If we got here, it means that the coordinator link attach could not be link routed to a broker (or to the next router).
        // The router should reject this link because the router cannot coordinate transactions itself.
        //
        // The attach response should have a null target to indicate refusal and the immediately coming detach.
        //
        qdr_link_outbound_second_attach_CT(core, link, source, 0);
        source = 0;  // ownership passed to qdr_link_outbound_second_attach_CT

        //
        // Now, send back a detach with the error amqp:precondition-failed
        //
        qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_COORDINATOR_PRECONDITION_FAILED, true);
    } else {
        //
        // Associate the link with the address.  With this association, it will be unnecessary
        // to do an address lookup for deliveries that arrive on this link.
        //
        qdr_core_bind_address_link_CT(core, addr, link);
        qdr_link_outbound_second_attach_CT(core, link, source, target);
        source = target = 0;  // ownership passed to qdr_link_outbound_second_attach_CT

        //
        // Issue the initial credit only if one of the following
        // holds:
        // - there are destinations for the address
        // - the address is that of an exchange (no subscribers allowed)
        //
        if (dir == QD_INCOMING
            && (DEQ_SIZE(addr->subscriptions)
                || DEQ_SIZE(addr->rlinks)
                || qd_bitmask_cardinality(addr->rnodes)
                || !!addr->exchange
                || (!!addr->fallback
                    && (DEQ_SIZE(addr->fallback->subscriptions)
                        || DEQ_SIZE(addr->fallback->rlinks)
                        || qd_bitmask_cardinality(addr->fallback->rnodes))))) {
            qdr_link_issue_credit_CT(core, link, link->capacity, false);
        }

        //
        // If this link came through an edge connection, raise a link event to
        // herald that fact.
        //
        if (link->conn->role == QDR_ROLE_EDGE_CONNECTION)
            qdrc_event_link_raise(core, QDRC_EVENT_LINK_EDGE_DATA_ATTACHED, link);
    }

    if (source)
        qdr_terminus_free(source);
    if (target)
        qdr_terminus_free(target);
}