int qdr_connection_process()

in src/router_core/connections.c [311:488]


int qdr_connection_process(qdr_connection_t *conn)
{
    qdr_connection_work_list_t  work_list;
    qdr_link_ref_list_t         links_with_work[QDR_N_PRIORITIES];
    qdr_core_t                 *core = conn->core;

    qdr_link_ref_t *ref;
    qdr_link_t     *link;
    bool            detach_sent;

    int event_count = 0;

    if (conn->closed) {
        conn->protocol_adaptor->conn_close_handler(conn->protocol_adaptor->user_context, conn, conn->error);
        return 0;
    }

    sys_mutex_lock(conn->work_lock);
    DEQ_MOVE(conn->work_list, work_list);
    for (int priority = 0; priority <= QDR_MAX_PRIORITY; ++ priority) {
        DEQ_MOVE(conn->links_with_work[priority], links_with_work[priority]);

        //
        // Move the references from CLASS_WORK to CLASS_LOCAL so concurrent action in the core
        // thread doesn't assume these links are referenced from the connection's list.
        //
        ref = DEQ_HEAD(links_with_work[priority]);
        while (ref) {
            move_link_ref(ref->link, QDR_LINK_LIST_CLASS_WORK, QDR_LINK_LIST_CLASS_LOCAL);
            ref->link->processing = true;
            ref = DEQ_NEXT(ref);
        }
    }
    sys_mutex_unlock(conn->work_lock);

    event_count += DEQ_SIZE(work_list);
    qdr_connection_work_t *work = DEQ_HEAD(work_list);
    while (work) {
        DEQ_REMOVE_HEAD(work_list);

        switch (work->work_type) {
        case QDR_CONNECTION_WORK_FIRST_ATTACH :
            conn->protocol_adaptor->first_attach_handler(conn->protocol_adaptor->user_context, conn, work->link, work->source, work->target, work->ssn_class);
            break;

        case QDR_CONNECTION_WORK_SECOND_ATTACH :
            conn->protocol_adaptor->second_attach_handler(conn->protocol_adaptor->user_context, work->link, work->source, work->target);
            break;

        case QDR_CONNECTION_WORK_TRACING_ON :
            conn->protocol_adaptor->conn_trace_handler(conn->protocol_adaptor->user_context, conn, true);
            break;

        case QDR_CONNECTION_WORK_TRACING_OFF :
            conn->protocol_adaptor->conn_trace_handler(conn->protocol_adaptor->user_context, conn, false);
            break;

        }

        qdr_connection_work_free_CT(work);
        work = DEQ_HEAD(work_list);
    }

    // Process the links_with_work array from highest to lowest priority.
    for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) {
        ref = DEQ_HEAD(links_with_work[priority]);
        while (ref) {
            qdr_link_work_t *link_work;
            detach_sent = false;
            link = ref->link;

            //
            // The work lock must be used to protect accesses to the link's work_list and
            // link_work->processing.
            //
            sys_mutex_lock(conn->work_lock);
            link_work = DEQ_HEAD(link->work_list);
            if (link_work) {
                // link_work ref transfered to local link_work
                DEQ_REMOVE_HEAD(link->work_list);
                link_work->processing = true;
            }
            sys_mutex_unlock(conn->work_lock);

            //
            // Handle disposition/settlement updates
            //
            qdr_delivery_ref_list_t updated_deliveries;
            sys_mutex_lock(conn->work_lock);
            DEQ_MOVE(link->updated_deliveries, updated_deliveries);
            sys_mutex_unlock(conn->work_lock);

            qdr_delivery_ref_t *dref = DEQ_HEAD(updated_deliveries);
            while (dref) {
                conn->protocol_adaptor->delivery_update_handler(conn->protocol_adaptor->user_context, dref->dlv, dref->dlv->disposition, dref->dlv->settled);
                qdr_delivery_decref(core, dref->dlv, "qdr_connection_process - remove from updated list");
                qdr_del_delivery_ref(&updated_deliveries, dref);
                dref = DEQ_HEAD(updated_deliveries);
                event_count++;
            }

            while (link_work) {
                switch (link_work->work_type) {
                case QDR_LINK_WORK_DELIVERY :
                    {
                        int count = conn->protocol_adaptor->push_handler(conn->protocol_adaptor->user_context, link, link_work->value);
                        assert(count <= link_work->value);
                        link_work->value -= count;
                        break;
                    }

                case QDR_LINK_WORK_FLOW :
                    if (link_work->value > 0)
                        conn->protocol_adaptor->flow_handler(conn->protocol_adaptor->user_context, link, link_work->value);
                    if      (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_SET)
                        conn->protocol_adaptor->drain_handler(conn->protocol_adaptor->user_context, link, true);
                    else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_CLEAR)
                        conn->protocol_adaptor->drain_handler(conn->protocol_adaptor->user_context, link, false);
                    else if (link_work->drain_action == QDR_LINK_WORK_DRAIN_ACTION_DRAINED)
                        conn->protocol_adaptor->drained_handler(conn->protocol_adaptor->user_context, link);
                    break;

                case QDR_LINK_WORK_FIRST_DETACH :
                case QDR_LINK_WORK_SECOND_DETACH :
                    conn->protocol_adaptor->detach_handler(conn->protocol_adaptor->user_context, link, link_work->error,
                                                           link_work->work_type == QDR_LINK_WORK_FIRST_DETACH,
                                                           link_work->close_link);
                    detach_sent = true;
                    break;
                }

                sys_mutex_lock(conn->work_lock);
                if (link_work->work_type == QDR_LINK_WORK_DELIVERY && link_work->value > 0 && !link->detach_received) {
                    // link_work ref transfered from link_work to work_list
                    DEQ_INSERT_HEAD(link->work_list, link_work);
                    link_work->processing = false;
                    link_work = 0; // Halt work processing
                } else {
                    qdr_link_work_release(link_work);
                    link_work = DEQ_HEAD(link->work_list);
                    if (link_work) {
                        // link_work ref transfered to local link_work
                        DEQ_REMOVE_HEAD(link->work_list);
                        link_work->processing = true;
                    }
                }
                sys_mutex_unlock(conn->work_lock);
                event_count++;
            }

            if (detach_sent) {
                // let the core thread know so it can clean up
                qdr_link_detach_sent(link);
            } else
                qdr_record_link_credit(core, link);

            ref = DEQ_NEXT(ref);
        }
    }

    sys_mutex_lock(conn->work_lock);
    for (int priority = QDR_MAX_PRIORITY; priority >= 0; -- priority) {
        ref = DEQ_HEAD(links_with_work[priority]);
        while (ref) {
            qdr_link_t *link = ref->link;

            link->processing = false;
            if (link->ready_to_free)
                qdr_link_processing_complete(core, link);

            qdr_del_link_ref(links_with_work + priority, ref->link, QDR_LINK_LIST_CLASS_LOCAL);
            ref = DEQ_HEAD(links_with_work[priority]);
        }
    }
    sys_mutex_unlock(conn->work_lock);

    return event_count;
}