void qd_container_handle_event()

in src/container.c [483:754]


void qd_container_handle_event(qd_container_t *container, pn_event_t *event,
                               pn_connection_t *conn, qd_connection_t *qd_conn)
{
    pn_session_t  *ssn = NULL;
    pn_link_t     *pn_link = NULL;
    qd_link_t     *qd_link = NULL;
    pn_delivery_t *delivery = NULL;

    switch (pn_event_type(event)) {

    case PN_CONNECTION_REMOTE_OPEN :
        qd_connection_set_user(qd_conn);
        qd_conn->open_container = (void *)container;
        if (pn_connection_state(conn) & PN_LOCAL_UNINIT) {
            // This Open is an externally initiated connection
            // Let policy engine decide
            /* TODO aconway 2017-04-11: presently the policy test is run
             * in the current thread.
             *
             * If/when the policy test can run in another thread, the connection
             * can be stalled by saving the current pn_event_batch and passing it
             * to pn_proactor_done() when the policy check is complete. Note we
             * can't run the policy check as a deferred function on the current
             * connection since by stalling the current connection it will never be
             * run, so we need some other thread context to run it in.
             */
            qd_policy_amqp_open(qd_conn);
        } else {
            // This Open is in response to an internally initiated connection
            qd_policy_amqp_open_connector(qd_conn);
        }
        break;

    case PN_CONNECTION_REMOTE_CLOSE :
        if (qd_conn)
            qd_conn->closed = true;
        if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {
            close_links(container, conn, false);
            qd_session_cleanup(qd_conn);
            pn_connection_close(conn);
            qd_conn_event_batch_complete(container, qd_conn, true);
        } else if (pn_connection_state(conn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
            close_links(container, conn, false);
            qd_session_cleanup(qd_conn);
            notify_closed(container, qd_conn, qd_connection_get_context(qd_conn));
            qd_conn_event_batch_complete(container, qd_conn, true);
        }
        break;

    case PN_SESSION_REMOTE_OPEN :
        if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
            ssn = pn_event_session(event);
            if (pn_session_state(ssn) & PN_LOCAL_UNINIT) {
                // remote created new session
                assert(qd_session_from_pn(ssn) == 0);
                qd_session_t *qd_ssn = qd_session(ssn);
                if (!qd_ssn) {
                    pn_condition_t *cond = pn_session_condition(ssn);
                    pn_condition_set_name(cond, QD_AMQP_COND_INTERNAL_ERROR);
                    pn_condition_set_description(cond, "Insufficient memory");
                    pn_session_close(ssn);
                    break;
                }
                if (qd_conn->policy_settings) {
                    if (!qd_policy_approve_amqp_session(ssn, qd_conn)) {
                        qd_session_free(qd_ssn);
                        break;
                    }
                    qd_conn->n_sessions++;
                }
                qd_session_link_pn(qd_ssn, ssn);
                qd_policy_apply_session_settings(ssn, qd_conn);
                pn_session_open(ssn);
            }
        }
        break;

    case PN_SESSION_LOCAL_CLOSE :
        ssn = pn_event_session(event);
        for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) {
            if (ssn == qd_conn->pn_sessions[i]) {
                qd_conn->pn_sessions[i] = 0;
                break;
            }
        }
        pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
        while (pn_link) {
            if (pn_link_session(pn_link) == ssn) {
                qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);
                if (qd_link)
                    qd_link->pn_link = 0;
            }
            pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED);
        }

        if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
            qd_session_t *qd_ssn = qd_session_from_pn(ssn);
            qd_session_free(qd_ssn);
            add_session_to_free_list(&qd_conn->free_link_session_list, ssn);
        }
        break;

    case PN_SESSION_REMOTE_CLOSE :
        ssn = pn_event_session(event);
        for (int i = 0; i < QD_SSN_CLASS_COUNT; ++i) {
            if (ssn == qd_conn->pn_sessions[i]) {
                qd_conn->pn_sessions[i] = 0;
                break;
            }
        }
        if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
            if (pn_session_state(ssn) == (PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED)) {


                // remote has nuked our session.  Check for any links that were
                // left open and forcibly detach them, since no detaches will
                // arrive on this session.
                pn_connection_t *conn = pn_session_connection(ssn);

                //Sweep thru every pn_link in this connection and a matching session and zero out the
                // qd_link->pn_link reference. We do this in order to not miss any pn_links
                pn_link = pn_link_head(conn, 0);
                while (pn_link) {
                    if (pn_link_session(pn_link) == ssn) {
                        qd_link_t *qd_link = (qd_link_t*) pn_link_get_context(pn_link);

                        if ((pn_link_state(pn_link) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE))) {
                            if (qd_link && qd_link->node) {
                                if (qd_conn->policy_settings) {
                                    if (qd_link->direction == QD_OUTGOING) {
                                        qd_conn->n_receivers--;
                                        assert(qd_conn->n_receivers >= 0);
                                    } else {
                                        qd_conn->n_senders--;
                                        assert(qd_conn->n_senders >= 0);
                                    }
                                }
                                qd_log(container->log_source, QD_LOG_DEBUG,
                                       "Aborting link '%s' due to parent session end",
                                       pn_link_name(pn_link));
                                qd_link->node->ntype->link_detach_handler(qd_link->node->context,
                                                                          qd_link, QD_LOST);
                            }
                        }

                        if (qd_link)
                            qd_link->pn_link = 0;
                    }
                    pn_link = pn_link_next(pn_link, 0);

                }
                if (qd_conn->policy_settings) {
                    qd_conn->n_sessions--;
                }

                pn_session_close(ssn);
            }
            else if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
                qd_session_t *qd_ssn = qd_session_from_pn(ssn);
                qd_session_free(qd_ssn);
                add_session_to_free_list(&qd_conn->free_link_session_list, ssn);
            }
        }
        break;

    case PN_LINK_REMOTE_OPEN :
        if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
            pn_link = pn_event_link(event);
            if (pn_link_state(pn_link) & PN_LOCAL_UNINIT) {
                if (pn_link_is_sender(pn_link)) {
                    if (qd_conn->policy_settings) {
                        if (!qd_policy_approve_amqp_receiver_link(pn_link, qd_conn)) {
                            break;
                        }
                        qd_conn->n_receivers++;
                    }
                    setup_outgoing_link(container, pn_link);
                } else {
                    if (qd_conn->policy_settings) {
                        if (!qd_policy_approve_amqp_sender_link(pn_link, qd_conn)) {
                            break;
                        }
                        qd_conn->n_senders++;
                    }
                    setup_incoming_link(container, pn_link, qd_connection_max_message_size(qd_conn));
                }
            } else if (pn_link_state(pn_link) & PN_LOCAL_ACTIVE)
                handle_link_open(container, pn_link);
        }
        break;

    case PN_LINK_REMOTE_DETACH :
    case PN_LINK_REMOTE_CLOSE :
        if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) {
            pn_link = pn_event_link(event);
            qd_link = (qd_link_t*) pn_link_get_context(pn_link);
            if (qd_link) {
                qd_node_t *node = qd_link->node;
                qd_detach_type_t dt = pn_event_type(event) == PN_LINK_REMOTE_CLOSE ? QD_CLOSED : QD_DETACHED;
                if (!node && qd_link->pn_link == pn_link) {
                    pn_link_close(pn_link);
                }
                if (qd_conn->policy_counted && qd_conn->policy_settings) {
                    if (pn_link_is_sender(pn_link)) {
                        qd_conn->n_receivers--;
                        qd_log(container->log_source, QD_LOG_TRACE,
                               "Closed receiver link %s. n_receivers: %d",
                               pn_link_name(pn_link), qd_conn->n_receivers);
                        assert (qd_conn->n_receivers >= 0);
                    } else {
                        qd_conn->n_senders--;
                        qd_log(container->log_source, QD_LOG_TRACE,
                               "Closed sender link %s. n_senders: %d",
                               pn_link_name(pn_link), qd_conn->n_senders);
                        assert (qd_conn->n_senders >= 0);
                    }
                }

                if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) {
                    add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
                }
                if (node) {
                    node->ntype->link_detach_handler(node->context, qd_link, dt);
                }
            } else {
                add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
            }
        }
        break;

    case PN_LINK_LOCAL_DETACH:
    case PN_LINK_LOCAL_CLOSE:
        pn_link = pn_event_link(event);
        if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) {
            add_link_to_free_list(&qd_conn->free_link_session_list, pn_link);
        }
        break;


    case PN_LINK_FLOW :
        pn_link = pn_event_link(event);
        qd_link = (qd_link_t*) pn_link_get_context(pn_link);
        if (qd_link && qd_link->node && qd_link->node->ntype->link_flow_handler)
            qd_link->node->ntype->link_flow_handler(qd_link->node->context, qd_link);
        break;

    case PN_DELIVERY :
        delivery = pn_event_delivery(event);
        pn_link  = pn_event_link(event);

        if (pn_delivery_readable(delivery))
            do_receive(pn_link, delivery);

        if (pn_delivery_updated(delivery) || pn_delivery_settled(delivery)) {
            do_updated(delivery);
            pn_delivery_clear(delivery);
        }
        break;

    case PN_CONNECTION_WAKE:
        if (!qd_conn->closed)
            writable_handler(container, conn, qd_conn);
        break;

    case PN_TRANSPORT_CLOSED:
        close_handler(container, conn, qd_conn);
        break;

    default:
        break;
    }
}