static bool AMQP_rx_handler()

in src/router_node.c [411:873]


static bool AMQP_rx_handler(void* context, qd_link_t *link)
{
    qd_router_t    *router  = (qd_router_t*) context;
    pn_link_t      *pn_link = qd_link_pn(link);

    assert(pn_link);

    if (!pn_link)
        return false;

    // ensure the current delivery is readable
    pn_delivery_t *pnd = pn_link_current(pn_link);
    if (!pnd)
        return false;

    qd_connection_t  *conn   = qd_link_connection(link);

    // DISPATCH-1628 DISPATCH-975 exit if router already closed this connection
    if (conn->closed_locally) {
        return false;
    }

    qdr_delivery_t *delivery = qdr_node_delivery_qdr_from_pn(pnd);
    bool       next_delivery = false;

    //
    // Receive the message into a local representation.
    //
    qd_message_t   *msg   = qd_message_receive(pnd);
    bool receive_complete = qd_message_receive_complete(msg);

    if (!qd_message_oversize(msg)) {
        // message not rejected as oversize
        if (receive_complete) {
            //
            // The entire message has been received and we are ready to consume the delivery by calling pn_link_advance().
            //
            pn_link_advance(pn_link);
            next_delivery = pn_link_current(pn_link) != 0;
        }

        if (qd_message_is_discard(msg)) {
            //
            // Message has been marked for discard, no further processing necessary
            //
            if (receive_complete) {
                // If this discarded delivery has already been settled by proton,
                // set the presettled flag on the delivery to true if it is not already true.
                // Since the entire message has already been received, we directly call the
                // function to set the pre-settled flag since we cannot go thru the core-thread
                // to do this since the delivery has been discarded.
                // Discarded streaming deliveries are not put thru the core thread via the continue action.
                if (pn_delivery_settled(pnd))
                    qdr_delivery_set_presettled(delivery);

                uint64_t local_disp = qdr_delivery_disposition(delivery);
                //
                // Call pn_delivery_update only if the local disposition is different than the pn_delivery's local disposition.
                // This will make sure we call pn_delivery_update only when necessary.
                //
                if (local_disp != 0 && local_disp != pn_delivery_local_state(pnd)) {
                    //
                    // DISPATCH-1626 - This enables pn_delivery_update() and pn_delivery_settle() to be called back to back in the same function call.
                    // CORE_delivery_update() will handle most of the other cases where we need to call pn_delivery_update() followed by pn_delivery_settle().
                    //
                    pn_delivery_update(pnd, local_disp);
                }

                // note: expected that the code that set discard has handled
                // setting disposition and updating flow!
                pn_delivery_settle(pnd);
                if (delivery) {
                    // if delivery already exists then the core thread discarded this
                    // delivery, it will eventually free the qdr_delivery_t and its
                    // associated message - do not free it here.
                    qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
                } else {
                    qd_message_free(msg);
                }
            }
            return next_delivery;
        }
    } else {
        // message is oversize
        if (receive_complete) {
            // set condition, reject, and settle the incoming delivery
            _reject_delivery(pnd, QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED, QD_AMQP_COND_OVERSIZE_DESCRIPTION);
            pn_delivery_settle(pnd);
            // close the link
            pn_link_close(pn_link);
            // set condition and close the connection
            pn_connection_t * pn_conn = qd_connection_pn(conn);
            pn_condition_t * cond = pn_connection_condition(pn_conn);
            (void) pn_condition_set_name(       cond, QD_AMQP_COND_CONNECTION_FORCED);
            (void) pn_condition_set_description(cond, QD_AMQP_COND_OVERSIZE_DESCRIPTION);
            pn_connection_close(pn_conn);
            if (!delivery) {
                // this message has not been forwarded yet, so it will not be
                // cleaned up when the link is freed.
                qd_message_free(msg);
            }
            // stop all message reception on this connection
            conn->closed_locally = true;
        }
        return false;
        // oversize messages are not processed any further
    }

    //
    // If the delivery already exists we've already passed it to the core (2nd
    // frame for a multi-frame transfer). Simply continue.
    //

    if (delivery) {
        qdr_delivery_continue(router->router_core, delivery, pn_delivery_settled(pnd));
        return next_delivery;
    }

    //
    // No pre-existing delivery means we're starting a new delivery or
    // continuing a delivery that has not accumulated enough of the message
    // for forwarding.
    //

    qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
    if (!rlink) {
        // receive link was closed or deleted - can't be forwarded
        // so no use setting disposition or adding flow
        qd_message_set_discard(msg, true);
        if (receive_complete) {
            pn_delivery_settle(pnd);
            qd_message_free(msg);
        }
        return next_delivery;
    }

    //
    // Validate the content of the delivery as an AMQP message.  This is done
    // partially, only to validate that we can find the fields we need to route
    // the message.
    //
    // If per-message tracing is configured then validate the sections
    // necessary for logging.
    //
    // link-routing: it is not necessary to validate any sections, but doing so
    // will force a message validity check and ensure the message is not null.
    //
    // If the link is anonymous, we must validate through the message
    // properties to find the 'to' field.  If the link is not anonymous, we
    // don't need the 'to' field as we will be using the address from the link
    // target.
    //
    // Check if the user id needs to be validated (see below). If it does we
    // need to validate the message properties section.
    //
    // Otherwise check the message annotations for router annotations necessary
    // for forwarding.
    //
    const bool link_routed    = qdr_link_is_routed(rlink);
    const bool anonymous_link = qdr_link_is_anonymous(rlink);
    const bool check_user     = (conn->policy_settings && !conn->policy_settings->spec.allowUserIdProxy);
    const qd_server_config_t *cf = qd_connection_config(conn);
    const qd_message_depth_t depth = (cf && cf->log_bits != 0) ? QD_DEPTH_APPLICATION_PROPERTIES
        : (link_routed) ? QD_DEPTH_HEADER
        : (anonymous_link || check_user) ? QD_DEPTH_PROPERTIES
        : QD_DEPTH_MESSAGE_ANNOTATIONS;

    const qd_message_depth_status_t depth_valid = qd_message_check_depth(msg, depth);
    switch (depth_valid) {
    case QD_MESSAGE_DEPTH_INVALID:
        qd_log(router->log_source, QD_LOG_DEBUG,
               "[C%"PRIu64"][L%"PRIu64"] Incoming message validation failed - rejected",
               conn->connection_id,
               qd_link_link_id(link));
        qd_message_set_discard(msg, true);
        pn_link_flow(pn_link, 1);
        _reject_delivery(pnd, QD_AMQP_COND_DECODE_ERROR, "invalid message format");
        pn_delivery_settle(pnd);
        qd_message_free(msg);
        return next_delivery;
    case QD_MESSAGE_DEPTH_INCOMPLETE:
        return false;  // stop rx processing
    case QD_MESSAGE_DEPTH_OK:
        break;
    }

    // Handle the link-routed case
    //
    if (link_routed) {
        pn_delivery_tag_t dtag = pn_delivery_tag(pnd);

        if (dtag.size > QDR_DELIVERY_TAG_MAX) {
            qd_log(router->log_source, QD_LOG_DEBUG, "link route delivery failure: msg tag size exceeded %zd (max=%d)",
                   dtag.size, QDR_DELIVERY_TAG_MAX);
            qd_message_set_discard(msg, true);
            pn_link_flow(pn_link, 1);
            _reject_delivery(pnd, QD_AMQP_COND_INVALID_FIELD, "delivery tag length exceeded");
            if (receive_complete) {
                pn_delivery_settle(pnd);
                qd_message_free(msg);
            }
            return next_delivery;
        }

        log_link_message(conn, pn_link, msg);
        delivery = qdr_link_deliver_to_routed_link(rlink,
                                                   msg,
                                                   pn_delivery_settled(pnd),
                                                   (uint8_t*) dtag.start,
                                                   dtag.size,
                                                   pn_delivery_remote_state(pnd),
                                                   qd_delivery_read_remote_state(pnd));
        qd_link_set_incoming_msg(link, (qd_message_t*) 0);  // msg no longer exclusive to qd_link
        qdr_node_connect_deliveries(link, delivery, pnd);
        qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_routed_link");
        return next_delivery;
    }

    // Determine if the user of this connection is allowed to proxy the user_id
    // of messages. A message user_id is proxied when the value in the message
    // properties section differs from the authenticated user name of the
    // connection.  If the user is not allowed to proxy the user_id then the
    // message user_id must be blank or it must be equal to the connection user
    // name.
    //
    if (check_user) {
        // This connection must not allow proxied user_id
        qd_iterator_t *userid_iter  = qd_message_field_iterator(msg, QD_FIELD_USER_ID);
        if (userid_iter) {
            // The user_id property has been specified
            if (qd_iterator_remaining(userid_iter) > 0) {
                // user_id property in message is not blank
                if (!qd_iterator_equal(userid_iter, (const unsigned char *)conn->user_id)) {
                    // This message is rejected: attempted user proxy is disallowed
                    qd_log(router->log_source, QD_LOG_DEBUG,
                           "[C%"PRIu64"][L%"PRIu64"] Message rejected due to user_id proxy violation. User:%s",
                           conn->connection_id,
                           qd_link_link_id(link),
                           conn->user_id);
                    qd_message_set_discard(msg, true);
                    pn_link_flow(pn_link, 1);
                    _reject_delivery(pnd, QD_AMQP_COND_UNAUTHORIZED_ACCESS, "user_id proxy violation");
                    if (receive_complete) {
                        pn_delivery_settle(pnd);
                        qd_message_free(msg);
                    }
                    qd_iterator_free(userid_iter);
                    return next_delivery;
                }
            }
            qd_iterator_free(userid_iter);
        }
    }

    const char *ma_error = qd_message_parse_annotations(msg);
    if (ma_error) {
        qd_log(router->log_source, QD_LOG_WARNING,
               "[C%"PRIu64"][L%"PRIu64"] Message rejected - invalid MA section: %s",
               conn->connection_id, qd_link_link_id(link), ma_error);

        pn_condition_t *condition = pn_disposition_condition(pn_delivery_local(pnd));
        pn_condition_set_name(condition, "amqp:invalid-field");
        pn_condition_set_description(condition, ma_error);
        pn_delivery_update(pnd, PN_REJECTED);
        qd_message_set_discard(msg, true);
        pn_link_flow(pn_link, 1);
        if (receive_complete) {
            pn_delivery_settle(pnd);
            qd_message_free(msg);
        }
        return next_delivery;
    }

    //
    // Head of line blocking avoidance (DISPATCH-1545)
    //
    // Before we can forward a message we need to determine whether or not this
    // message is "streaming" - a large message that has the potential to block
    // other messages sharing the trunk link.  At this point we cannot for sure
    // know the actual length of the incoming message, so we employ the
    // following heuristic to determine if the message is "streaming":
    //
    // - If the message is receive-complete it is NOT a streaming message.
    // - If it is NOT receive-complete:
    //   Continue buffering incoming data until:
    //   - receive has completed => NOT a streaming message
    //   - not rx-complete AND Q2 threshold hit => a streaming message
    //
    // Once Q2 is hit we MUST forward the message regardless of rx-complete
    // since Q2 will block forever unless the incoming data is drained via
    // forwarding.
    //
    if (!receive_complete) {
        if (qd_message_is_streaming(msg) || qd_message_is_Q2_blocked(msg)) {
            qd_log(router->log_source, QD_LOG_DEBUG,
                   "[C%"PRIu64"][L%"PRIu64"] Incoming message classified as streaming. User:%s",
                   conn->connection_id,
                   qd_link_link_id(link),
                   conn->user_id);
        } else {
            // Continue buffering this message
            return false;
        }
    }

    uint32_t       distance = 0;
    int            ingress_index = 0; // Default to _this_ router
    qd_bitmask_t  *link_exclusions = 0;
    qd_iterator_t *ingress_iter = process_router_annotations(router, msg, &link_exclusions, &distance, &ingress_index);

    //
    // If this delivery has traveled further than the known radius of the network topology (plus 1),
    // release and settle the delivery.  This can happen in the case of "flood" multicast where the
    // deliveries follow all available paths.  This will only discard messages that will reach their
    // destinations via shorter paths.
    //
    if (distance > (router->topology_radius + 1)) {
        qd_bitmask_free(link_exclusions);
        qd_message_set_discard(msg, true);
        pn_link_flow(pn_link, 1);
        pn_delivery_update(pnd, PN_RELEASED);
        if (receive_complete) {
            pn_delivery_settle(pnd);
            qd_message_free(msg);
        }
        return next_delivery;
    }

    if (anonymous_link) {
        qd_iterator_t *addr_iter = 0;
        int phase = 0;

        //
        // If the message has delivery annotations, get the to-override field from the annotations.
        //
        qd_parsed_field_t *ma_to = qd_message_get_to_override(msg);
        if (ma_to) {
            addr_iter = qd_iterator_dup(qd_parse_raw(ma_to));
            phase = qd_message_get_phase_annotation(msg);
        }

        //
        // Still no destination address?  Use the TO field from the message properties.
        //
        if (!addr_iter) {
            addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);

            //
            // If the address came from the TO field and we need to apply a tenant-space,
            // set the to-override with the annotated address.
            //
            if (addr_iter) {
                int tenant_space_length;
                const char *tenant_space = _get_tenant_space(conn, &tenant_space_length);
                if (tenant_space) {
                    qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_WITH_SPACE);
                    qd_iterator_annotate_space(addr_iter, tenant_space, tenant_space_length);
                    char *iter_str = (char *)qd_iterator_copy(addr_iter);
                    qd_message_set_to_override_annotation(msg, iter_str);
                    free(iter_str);
                }
            }
        }

        if (addr_iter) {
            if (!conn->policy_settings || qd_policy_approve_message_target(addr_iter, conn)) {
                qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
                if (phase > 0)
                    qd_iterator_annotate_phase(addr_iter, '0' + (char) phase);

                log_link_message(conn, pn_link, msg);
                delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd),
                                               link_exclusions, ingress_index,
                                               pn_delivery_remote_state(pnd),
                                               qd_delivery_read_remote_state(pnd));
            } else {
                //reject
                qd_log(router->log_source, QD_LOG_DEBUG,
                       "[C%"PRIu64"][L%"PRIu64"] Message rejected due to policy violation on target. User:%s",
                       conn->connection_id,
                       qd_link_link_id(link),
                       conn->user_id);
                qd_message_set_discard(msg, true);
                pn_link_flow(pn_link, 1);
                _reject_delivery(pnd, QD_AMQP_COND_UNAUTHORIZED_ACCESS, "policy violation on target");
                if (receive_complete) {
                    pn_delivery_settle(pnd);
                    qd_message_free(msg);
                }
                qd_iterator_free(addr_iter);
                qd_bitmask_free(link_exclusions);
                return next_delivery;
            }
        }
    } else {
        //
        // This is a targeted link, not anonymous.
        //

        //
        // Look in a series of locations for the terminus address, starting
        // with the qdr_link (in case this is an auto-link with separate
        // internal and external addresses).
        //
        const char *term_addr = qdr_link_internal_address(rlink);
        if (!term_addr) {
            term_addr = pn_terminus_get_address(qd_link_remote_target(link));
            if (!term_addr)
                term_addr = pn_terminus_get_address(qd_link_source(link));
        }

        if (term_addr) {
            int tenant_space_length;
            const char *tenant_space = _get_tenant_space(conn, &tenant_space_length);
            if (tenant_space) {
                qd_iterator_t *aiter = qd_iterator_string(term_addr, ITER_VIEW_ADDRESS_WITH_SPACE);
                qd_iterator_annotate_space(aiter, tenant_space, tenant_space_length);
                char *iter_str = (char *) qd_iterator_copy(aiter);
                qd_message_set_to_override_annotation(msg, iter_str);
                free(iter_str);
                qd_iterator_free(aiter);
            } else
                qd_message_set_to_override_annotation(msg, term_addr);

            int phase = qdr_link_phase(rlink);
            if (phase != 0)
                qd_message_set_phase_annotation(msg, phase);
        }

        log_link_message(conn, pn_link, msg);
        delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions, ingress_index,
                                    pn_delivery_remote_state(pnd),
                                    qd_delivery_read_remote_state(pnd));
    }

    //
    // End of new delivery processing
    //

    if (delivery) {
        qd_link_set_incoming_msg(link, (qd_message_t*) 0);  // msg no longer exclusive to qd_link
        qdr_node_connect_deliveries(link, delivery, pnd);
        qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver");
    } else {
        //
        // If there is no delivery, the message is now and will always be unroutable because there is no address.
        //
        qd_log(router->log_source, QD_LOG_DEBUG,
               "[C%"PRIu64"][L%"PRIu64"] Message rejected - no address present",
               conn->connection_id,
               qd_link_link_id(link));
        qd_bitmask_free(link_exclusions);
        qd_message_set_discard(msg, true);
        pn_link_flow(pn_link, 1);
        _reject_delivery(pnd, QD_AMQP_COND_PRECONDITION_FAILED, "Routing failure: no address present");
        if (receive_complete) {
            pn_delivery_settle(pnd);
            qd_message_free(msg);
        }
    }

    return next_delivery;
}