in src/router_node.c [411:873]
static bool AMQP_rx_handler(void* context, qd_link_t *link)
{
qd_router_t *router = (qd_router_t*) context;
pn_link_t *pn_link = qd_link_pn(link);
assert(pn_link);
if (!pn_link)
return false;
// ensure the current delivery is readable
pn_delivery_t *pnd = pn_link_current(pn_link);
if (!pnd)
return false;
qd_connection_t *conn = qd_link_connection(link);
// DISPATCH-1628 DISPATCH-975 exit if router already closed this connection
if (conn->closed_locally) {
return false;
}
qdr_delivery_t *delivery = qdr_node_delivery_qdr_from_pn(pnd);
bool next_delivery = false;
//
// Receive the message into a local representation.
//
qd_message_t *msg = qd_message_receive(pnd);
bool receive_complete = qd_message_receive_complete(msg);
if (!qd_message_oversize(msg)) {
// message not rejected as oversize
if (receive_complete) {
//
// The entire message has been received and we are ready to consume the delivery by calling pn_link_advance().
//
pn_link_advance(pn_link);
next_delivery = pn_link_current(pn_link) != 0;
}
if (qd_message_is_discard(msg)) {
//
// Message has been marked for discard, no further processing necessary
//
if (receive_complete) {
// If this discarded delivery has already been settled by proton,
// set the presettled flag on the delivery to true if it is not already true.
// Since the entire message has already been received, we directly call the
// function to set the pre-settled flag since we cannot go thru the core-thread
// to do this since the delivery has been discarded.
// Discarded streaming deliveries are not put thru the core thread via the continue action.
if (pn_delivery_settled(pnd))
qdr_delivery_set_presettled(delivery);
uint64_t local_disp = qdr_delivery_disposition(delivery);
//
// Call pn_delivery_update only if the local disposition is different than the pn_delivery's local disposition.
// This will make sure we call pn_delivery_update only when necessary.
//
if (local_disp != 0 && local_disp != pn_delivery_local_state(pnd)) {
//
// DISPATCH-1626 - This enables pn_delivery_update() and pn_delivery_settle() to be called back to back in the same function call.
// CORE_delivery_update() will handle most of the other cases where we need to call pn_delivery_update() followed by pn_delivery_settle().
//
pn_delivery_update(pnd, local_disp);
}
// note: expected that the code that set discard has handled
// setting disposition and updating flow!
pn_delivery_settle(pnd);
if (delivery) {
// if delivery already exists then the core thread discarded this
// delivery, it will eventually free the qdr_delivery_t and its
// associated message - do not free it here.
qdr_node_disconnect_deliveries(router->router_core, link, delivery, pnd);
} else {
qd_message_free(msg);
}
}
return next_delivery;
}
} else {
// message is oversize
if (receive_complete) {
// set condition, reject, and settle the incoming delivery
_reject_delivery(pnd, QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED, QD_AMQP_COND_OVERSIZE_DESCRIPTION);
pn_delivery_settle(pnd);
// close the link
pn_link_close(pn_link);
// set condition and close the connection
pn_connection_t * pn_conn = qd_connection_pn(conn);
pn_condition_t * cond = pn_connection_condition(pn_conn);
(void) pn_condition_set_name( cond, QD_AMQP_COND_CONNECTION_FORCED);
(void) pn_condition_set_description(cond, QD_AMQP_COND_OVERSIZE_DESCRIPTION);
pn_connection_close(pn_conn);
if (!delivery) {
// this message has not been forwarded yet, so it will not be
// cleaned up when the link is freed.
qd_message_free(msg);
}
// stop all message reception on this connection
conn->closed_locally = true;
}
return false;
// oversize messages are not processed any further
}
//
// If the delivery already exists we've already passed it to the core (2nd
// frame for a multi-frame transfer). Simply continue.
//
if (delivery) {
qdr_delivery_continue(router->router_core, delivery, pn_delivery_settled(pnd));
return next_delivery;
}
//
// No pre-existing delivery means we're starting a new delivery or
// continuing a delivery that has not accumulated enough of the message
// for forwarding.
//
qdr_link_t *rlink = (qdr_link_t*) qd_link_get_context(link);
if (!rlink) {
// receive link was closed or deleted - can't be forwarded
// so no use setting disposition or adding flow
qd_message_set_discard(msg, true);
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
return next_delivery;
}
//
// Validate the content of the delivery as an AMQP message. This is done
// partially, only to validate that we can find the fields we need to route
// the message.
//
// If per-message tracing is configured then validate the sections
// necessary for logging.
//
// link-routing: it is not necessary to validate any sections, but doing so
// will force a message validity check and ensure the message is not null.
//
// If the link is anonymous, we must validate through the message
// properties to find the 'to' field. If the link is not anonymous, we
// don't need the 'to' field as we will be using the address from the link
// target.
//
// Check if the user id needs to be validated (see below). If it does we
// need to validate the message properties section.
//
// Otherwise check the message annotations for router annotations necessary
// for forwarding.
//
const bool link_routed = qdr_link_is_routed(rlink);
const bool anonymous_link = qdr_link_is_anonymous(rlink);
const bool check_user = (conn->policy_settings && !conn->policy_settings->spec.allowUserIdProxy);
const qd_server_config_t *cf = qd_connection_config(conn);
const qd_message_depth_t depth = (cf && cf->log_bits != 0) ? QD_DEPTH_APPLICATION_PROPERTIES
: (link_routed) ? QD_DEPTH_HEADER
: (anonymous_link || check_user) ? QD_DEPTH_PROPERTIES
: QD_DEPTH_MESSAGE_ANNOTATIONS;
const qd_message_depth_status_t depth_valid = qd_message_check_depth(msg, depth);
switch (depth_valid) {
case QD_MESSAGE_DEPTH_INVALID:
qd_log(router->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Incoming message validation failed - rejected",
conn->connection_id,
qd_link_link_id(link));
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
_reject_delivery(pnd, QD_AMQP_COND_DECODE_ERROR, "invalid message format");
pn_delivery_settle(pnd);
qd_message_free(msg);
return next_delivery;
case QD_MESSAGE_DEPTH_INCOMPLETE:
return false; // stop rx processing
case QD_MESSAGE_DEPTH_OK:
break;
}
// Handle the link-routed case
//
if (link_routed) {
pn_delivery_tag_t dtag = pn_delivery_tag(pnd);
if (dtag.size > QDR_DELIVERY_TAG_MAX) {
qd_log(router->log_source, QD_LOG_DEBUG, "link route delivery failure: msg tag size exceeded %zd (max=%d)",
dtag.size, QDR_DELIVERY_TAG_MAX);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
_reject_delivery(pnd, QD_AMQP_COND_INVALID_FIELD, "delivery tag length exceeded");
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
return next_delivery;
}
log_link_message(conn, pn_link, msg);
delivery = qdr_link_deliver_to_routed_link(rlink,
msg,
pn_delivery_settled(pnd),
(uint8_t*) dtag.start,
dtag.size,
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
qd_link_set_incoming_msg(link, (qd_message_t*) 0); // msg no longer exclusive to qd_link
qdr_node_connect_deliveries(link, delivery, pnd);
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver_to_routed_link");
return next_delivery;
}
// Determine if the user of this connection is allowed to proxy the user_id
// of messages. A message user_id is proxied when the value in the message
// properties section differs from the authenticated user name of the
// connection. If the user is not allowed to proxy the user_id then the
// message user_id must be blank or it must be equal to the connection user
// name.
//
if (check_user) {
// This connection must not allow proxied user_id
qd_iterator_t *userid_iter = qd_message_field_iterator(msg, QD_FIELD_USER_ID);
if (userid_iter) {
// The user_id property has been specified
if (qd_iterator_remaining(userid_iter) > 0) {
// user_id property in message is not blank
if (!qd_iterator_equal(userid_iter, (const unsigned char *)conn->user_id)) {
// This message is rejected: attempted user proxy is disallowed
qd_log(router->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Message rejected due to user_id proxy violation. User:%s",
conn->connection_id,
qd_link_link_id(link),
conn->user_id);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
_reject_delivery(pnd, QD_AMQP_COND_UNAUTHORIZED_ACCESS, "user_id proxy violation");
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
qd_iterator_free(userid_iter);
return next_delivery;
}
}
qd_iterator_free(userid_iter);
}
}
const char *ma_error = qd_message_parse_annotations(msg);
if (ma_error) {
qd_log(router->log_source, QD_LOG_WARNING,
"[C%"PRIu64"][L%"PRIu64"] Message rejected - invalid MA section: %s",
conn->connection_id, qd_link_link_id(link), ma_error);
pn_condition_t *condition = pn_disposition_condition(pn_delivery_local(pnd));
pn_condition_set_name(condition, "amqp:invalid-field");
pn_condition_set_description(condition, ma_error);
pn_delivery_update(pnd, PN_REJECTED);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
return next_delivery;
}
//
// Head of line blocking avoidance (DISPATCH-1545)
//
// Before we can forward a message we need to determine whether or not this
// message is "streaming" - a large message that has the potential to block
// other messages sharing the trunk link. At this point we cannot for sure
// know the actual length of the incoming message, so we employ the
// following heuristic to determine if the message is "streaming":
//
// - If the message is receive-complete it is NOT a streaming message.
// - If it is NOT receive-complete:
// Continue buffering incoming data until:
// - receive has completed => NOT a streaming message
// - not rx-complete AND Q2 threshold hit => a streaming message
//
// Once Q2 is hit we MUST forward the message regardless of rx-complete
// since Q2 will block forever unless the incoming data is drained via
// forwarding.
//
if (!receive_complete) {
if (qd_message_is_streaming(msg) || qd_message_is_Q2_blocked(msg)) {
qd_log(router->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Incoming message classified as streaming. User:%s",
conn->connection_id,
qd_link_link_id(link),
conn->user_id);
} else {
// Continue buffering this message
return false;
}
}
uint32_t distance = 0;
int ingress_index = 0; // Default to _this_ router
qd_bitmask_t *link_exclusions = 0;
qd_iterator_t *ingress_iter = process_router_annotations(router, msg, &link_exclusions, &distance, &ingress_index);
//
// If this delivery has traveled further than the known radius of the network topology (plus 1),
// release and settle the delivery. This can happen in the case of "flood" multicast where the
// deliveries follow all available paths. This will only discard messages that will reach their
// destinations via shorter paths.
//
if (distance > (router->topology_radius + 1)) {
qd_bitmask_free(link_exclusions);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
pn_delivery_update(pnd, PN_RELEASED);
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
return next_delivery;
}
if (anonymous_link) {
qd_iterator_t *addr_iter = 0;
int phase = 0;
//
// If the message has delivery annotations, get the to-override field from the annotations.
//
qd_parsed_field_t *ma_to = qd_message_get_to_override(msg);
if (ma_to) {
addr_iter = qd_iterator_dup(qd_parse_raw(ma_to));
phase = qd_message_get_phase_annotation(msg);
}
//
// Still no destination address? Use the TO field from the message properties.
//
if (!addr_iter) {
addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);
//
// If the address came from the TO field and we need to apply a tenant-space,
// set the to-override with the annotated address.
//
if (addr_iter) {
int tenant_space_length;
const char *tenant_space = _get_tenant_space(conn, &tenant_space_length);
if (tenant_space) {
qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_WITH_SPACE);
qd_iterator_annotate_space(addr_iter, tenant_space, tenant_space_length);
char *iter_str = (char *)qd_iterator_copy(addr_iter);
qd_message_set_to_override_annotation(msg, iter_str);
free(iter_str);
}
}
}
if (addr_iter) {
if (!conn->policy_settings || qd_policy_approve_message_target(addr_iter, conn)) {
qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
if (phase > 0)
qd_iterator_annotate_phase(addr_iter, '0' + (char) phase);
log_link_message(conn, pn_link, msg);
delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd),
link_exclusions, ingress_index,
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
} else {
//reject
qd_log(router->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Message rejected due to policy violation on target. User:%s",
conn->connection_id,
qd_link_link_id(link),
conn->user_id);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
_reject_delivery(pnd, QD_AMQP_COND_UNAUTHORIZED_ACCESS, "policy violation on target");
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
qd_iterator_free(addr_iter);
qd_bitmask_free(link_exclusions);
return next_delivery;
}
}
} else {
//
// This is a targeted link, not anonymous.
//
//
// Look in a series of locations for the terminus address, starting
// with the qdr_link (in case this is an auto-link with separate
// internal and external addresses).
//
const char *term_addr = qdr_link_internal_address(rlink);
if (!term_addr) {
term_addr = pn_terminus_get_address(qd_link_remote_target(link));
if (!term_addr)
term_addr = pn_terminus_get_address(qd_link_source(link));
}
if (term_addr) {
int tenant_space_length;
const char *tenant_space = _get_tenant_space(conn, &tenant_space_length);
if (tenant_space) {
qd_iterator_t *aiter = qd_iterator_string(term_addr, ITER_VIEW_ADDRESS_WITH_SPACE);
qd_iterator_annotate_space(aiter, tenant_space, tenant_space_length);
char *iter_str = (char *) qd_iterator_copy(aiter);
qd_message_set_to_override_annotation(msg, iter_str);
free(iter_str);
qd_iterator_free(aiter);
} else
qd_message_set_to_override_annotation(msg, term_addr);
int phase = qdr_link_phase(rlink);
if (phase != 0)
qd_message_set_phase_annotation(msg, phase);
}
log_link_message(conn, pn_link, msg);
delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions, ingress_index,
pn_delivery_remote_state(pnd),
qd_delivery_read_remote_state(pnd));
}
//
// End of new delivery processing
//
if (delivery) {
qd_link_set_incoming_msg(link, (qd_message_t*) 0); // msg no longer exclusive to qd_link
qdr_node_connect_deliveries(link, delivery, pnd);
qdr_delivery_decref(router->router_core, delivery, "release protection of return from deliver");
} else {
//
// If there is no delivery, the message is now and will always be unroutable because there is no address.
//
qd_log(router->log_source, QD_LOG_DEBUG,
"[C%"PRIu64"][L%"PRIu64"] Message rejected - no address present",
conn->connection_id,
qd_link_link_id(link));
qd_bitmask_free(link_exclusions);
qd_message_set_discard(msg, true);
pn_link_flow(pn_link, 1);
_reject_delivery(pnd, QD_AMQP_COND_PRECONDITION_FAILED, "Routing failure: no address present");
if (receive_complete) {
pn_delivery_settle(pnd);
qd_message_free(msg);
}
}
return next_delivery;
}