in src/container.c [483:754]
void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
pn_connection_t *conn, qd_connection_t *qd_conn)
{
pn_session_t *ssn = NULL;
pn_link_t *pn_link = NULL;
qd_link_t *qd_link = NULL;
pn_delivery_t *delivery = NULL;
switch (pn_event_type(event)) {
case PN_CONNECTION_REMOTE_OPEN :
qd_connection_set_user(qd_conn);
qd_conn->open_container = (void *)container;
if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
// This Open is an externally initiated connection
// Let policy engine decide
/* TODO aconway 2017-04-11: presently the policy test is run
* in the current thread.
*
* If/when the policy test can run in another thread, the connection
* can be stalled by saving the current pn_event_batch and passing it
* to pn_proactor_done() when the policy check is complete. Note we
* can't run the policy check as a deferred function on the current
* connection since by stalling the current connection it will never be
* run, so we need some other thread context to run it in.
*/
qd_policy_amqp_open(qd_conn);
} else {
// This Open is in response to an internally initiated connection
qd_policy_amqp_open_connector(qd_conn);
}
break;
case PN_CONNECTION_REMOTE_CLOSE :
if (qd_conn)
qd_conn->closed = true;
if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
close_links(container, conn, false);
qd_session_cleanup(qd_conn);
pn_connection_close(conn);
qd_conn_event_batch_complete(container, qd_conn, true);
} else if (pn_connection_state(conn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
close_links(container, conn, false);
qd_session_cleanup(qd_conn);
notify_closed(container, qd_conn, qd_connection_get_context(qd_conn));
qd_conn_event_batch_complete(container, qd_conn, true);
}
break;
case PN_SESSION_REMOTE_OPEN :
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
ssn = pn_event_session(event);
if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
// remote created new session
assert(qd_session_from_pn(ssn) == 0);
qd_session_t *qd_ssn = qd_session(ssn);
if (!qd_ssn) {
pn_condition_t *cond = pn_session_condition(ssn);
pn_condition_set_name(cond, QD_AMQP_COND_INTERNAL_ERROR);
pn_condition_set_description(cond, "Insufficient memory");
pn_session_close(ssn);
break;
}
if (qd_conn->policy_settings) {
if (!qd_policy_approve_amqp_session(ssn, qd_conn)) {
qd_session_free(qd_ssn);
break;
}
qd_conn->n_sessions++;
}
qd_session_link_pn(qd_ssn, ssn);
qd_policy_apply_session_settings(ssn, qd_conn);
pn_session_open(ssn);
}
}
break;
case PN_SESSION_LOCAL_CLOSE :
ssn = pn_event_session(event);
for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) {
if (ssn == qd_conn->pn_sessions[i]) {
qd_conn->pn_sessions[i] = 0;
break;
}
}
pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
while (pn_link) {
if (pn_link_session(pn_link) == ssn) {
qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link)
qd_link->pn_link = 0;
}
pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
}
if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
qd_session_t *qd_ssn = qd_session_from_pn(ssn);
qd_session_free(qd_ssn);
add_session_to_free_list(&qd_conn->free_link_session_list, ssn);
}
break;
case PN_SESSION_REMOTE_CLOSE :
ssn = pn_event_session(event);
for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) {
if (ssn == qd_conn->pn_sessions[i]) {
qd_conn->pn_sessions[i] = 0;
break;
}
}
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
// remote has nuked our session. Check for any links that were
// left open and forcibly detach them, since no detaches will
// arrive on this session.
pn_connection_t *conn = pn_session_connection(ssn);
//Sweep thru every pn_link in this connection and a matching session and zero out the
// qd_link->pn_link reference. We do this in order to not miss any pn_links
pn_link = pn_link_head(conn, 0);
while (pn_link) {
if (pn_link_session(pn_link) == ssn) {
qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if ((pn_link_state(pn_link) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE))) {
if (qd_link && qd_link->node) {
if (qd_conn->policy_settings) {
if (qd_link->direction == QD_OUTGOING) {
qd_conn->n_receivers--;
assert(qd_conn->n_receivers >= 0);
} else {
qd_conn->n_senders--;
assert(qd_conn->n_senders >= 0);
}
}
qd_log(container->log_source, QD_LOG_DEBUG,
"Aborting link '%s' due to parent session end",
pn_link_name(pn_link));
qd_link->node->ntype->link_detach_handler(qd_link->node->context,
qd_link, QD_LOST);
}
}
if (qd_link)
qd_link->pn_link = 0;
}
pn_link = pn_link_next(pn_link, 0);
}
if (qd_conn->policy_settings) {
qd_conn->n_sessions--;
}
pn_session_close(ssn);
}
else if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
qd_session_t *qd_ssn = qd_session_from_pn(ssn);
qd_session_free(qd_ssn);
add_session_to_free_list(&qd_conn->free_link_session_list, ssn);
}
}
break;
case PN_LINK_REMOTE_OPEN :
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
if (pn_link_is_sender(pn_link)) {
if (qd_conn->policy_settings) {
if (!qd_policy_approve_amqp_receiver_link(pn_link, qd_conn)) {
break;
}
qd_conn->n_receivers++;
}
setup_outgoing_link(container, pn_link);
} else {
if (qd_conn->policy_settings) {
if (!qd_policy_approve_amqp_sender_link(pn_link, qd_conn)) {
break;
}
qd_conn->n_senders++;
}
setup_incoming_link(container, pn_link, qd_connection_max_message_size(qd_conn));
}
} else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
handle_link_open(container, pn_link);
}
break;
case PN_LINK_REMOTE_DETACH :
case PN_LINK_REMOTE_CLOSE :
if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
pn_link = pn_event_link(event);
qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link) {
qd_node_t *node = qd_link->node;
qd_detach_type_t dt = pn_event_type(event) == PN_LINK_REMOTE_CLOSE ? QD_CLOSED : QD_DETACHED;
if (!node && qd_link->pn_link == pn_link) {
pn_link_close(pn_link);
}
if (qd_conn->policy_counted && qd_conn->policy_settings) {
if (pn_link_is_sender(pn_link)) {
qd_conn->n_receivers--;
qd_log(container->log_source, QD_LOG_TRACE,
"Closed receiver link %s. n_receivers: %d",
pn_link_name(pn_link), qd_conn->n_receivers);
assert (qd_conn->n_receivers >= 0);
} else {
qd_conn->n_senders--;
qd_log(container->log_source, QD_LOG_TRACE,
"Closed sender link %s. n_senders: %d",
pn_link_name(pn_link), qd_conn->n_senders);
assert (qd_conn->n_senders >= 0);
}
}
if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
}
if (node) {
node->ntype->link_detach_handler(node->context, qd_link, dt);
}
} else {
add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
}
}
break;
case PN_LINK_LOCAL_DETACH:
case PN_LINK_LOCAL_CLOSE:
pn_link = pn_event_link(event);
if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
}
break;
case PN_LINK_FLOW :
pn_link = pn_event_link(event);
qd_link = (qd_link_t*) pn_link_get_context(pn_link);
if (qd_link && qd_link->node && qd_link->node->ntype->link_flow_handler)
qd_link->node->ntype->link_flow_handler(qd_link->node->context, qd_link);
break;
case PN_DELIVERY :
delivery = pn_event_delivery(event);
pn_link = pn_event_link(event);
if (pn_delivery_readable(delivery))
do_receive(pn_link, delivery);
if (pn_delivery_updated(delivery) || pn_delivery_settled(delivery)) {
do_updated(delivery);
pn_delivery_clear(delivery);
}
break;
case PN_CONNECTION_WAKE:
if (!qd_conn->closed)
writable_handler(container, conn, qd_conn);
break;
case PN_TRANSPORT_CLOSED:
close_handler(container, conn, qd_conn);
break;
default:
break;
}
}