container::impl::dispatch_result container::impl::dispatch()

in cpp/src/proactor_container_impl.cpp [560:740]


container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) {

    // If we have any pending connection work, do it now
    pn_connection_t* c = pn_event_connection(event);
    if (c) {
        work_queue::impl* queue = connection_context::get(c).work_queue_.impl_.get();
        queue->run_all_jobs();
    }

    // Process events that shouldn't be sent to messaging_handler
    switch (pn_event_type(event)) {

    case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
        // If we're stopping interrupt all other threads still running
        if (auto_stop_) pn_proactor_interrupt(proactor_);
        return ContinueLoop;

    // We only interrupt to stop threads
    case PN_PROACTOR_INTERRUPT: {
        // Interrupt any other threads still running
        GUARD(lock_);
        if (threads_>1) pn_proactor_interrupt(proactor_);
        return EndLoop;
    }

    case PN_PROACTOR_TIMEOUT: {
        // Can get an immediate timeout, if we have a container event loop inject
        run_timer_jobs();

        // Run every container event loop job
        // This is not at all efficient and single threads all these jobs, but it does correctly
        // serialise them
        work_queues queues;
        {
            GUARD(work_queues_lock_);
            queues = work_queues_;
        }
        for (work_queues::iterator queue = queues.begin(); queue!=queues.end(); ++queue) {
            (*queue)->run_all_jobs();
        }
        return EndBatch;
    }
    case PN_LISTENER_OPEN: {
        pn_listener_t* l = pn_event_listener(event);
        proton::listen_handler* handler;
        {
            GUARD(lock_);
            listener_context &lc(listener_context::get(l));
            handler = lc.listen_handler_;
        }
        if (handler) {
            listener lstnr(l);
            handler->on_open(lstnr);
        }
        return ContinueLoop;
    }
    case PN_LISTENER_ACCEPT: {
        pn_listener_t* l = pn_event_listener(event);
        pn_connection_t* c = pn_connection();
        pn_connection_set_container(c, id_.c_str());
        connection_options opts = server_connection_options_;
        listen_handler* handler;
        listener_context* lc;
        const connection_options* options;
        {
            GUARD(lock_);
            lc = &listener_context::get(l);
            handler = lc->listen_handler_;
            options = lc->connection_options_.get();
        }
        if (handler) {
            listener lstr(l);
            opts.update(handler->on_accept(lstr));
        }
        else if (options) opts.update(*options);
        // Handler applied separately
        connection_context& cc = connection_context::get(c);
        cc.container = &container_;
        cc.listener_context_ = lc;
        cc.handler = opts.handler();
        cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, c);
        pn_transport_t* pnt = pn_transport();
        pn_transport_set_server(pnt);
        opts.apply_unbound_server(pnt);
        pn_listener_accept2(l, c, pnt);
        return ContinueLoop;
    }
    case PN_LISTENER_CLOSE: {
        pn_listener_t* l = pn_event_listener(event);
        proton::listen_handler* handler;
        {
            GUARD(lock_);
            listener_context &lc(listener_context::get(l));
            handler = lc.listen_handler_;
        }
        listener lstnr(l);
        if (handler) {
            pn_condition_t* c = pn_listener_condition(l);
            if (pn_condition_is_set(c)) {
                handler->on_error(lstnr, make_wrapper(c).what());
            }
            handler->on_close(lstnr);
        }
        return ContinueLoop;
    }
    // Connection driver will bind a new transport to the connection at this point
    case PN_CONNECTION_INIT:
        return ContinueLoop;

    case PN_CONNECTION_REMOTE_OPEN: {
        // This is the only event that we get indicating that the connection succeeded so
        // it's the only place to reset the reconnection logic.
        //
        // Just note we have a connection then process normally
        pn_connection_t* c = pn_event_connection(event);
        reset_reconnect(c);
        break;
    }
    case PN_CONNECTION_REMOTE_CLOSE: {
        pn_connection_t *c = pn_event_connection(event);
        pn_condition_t *cc = pn_connection_remote_condition(c);

        // If reconnect is on, amqp:connection:forced should be treated specially:
        // Hide the connection error/close events from the application;
        // Then we close the connection noting the forced close;
        // Then set up for reconnect handling.
        if (get_reconnect_context(c) &&
            pn_condition_is_set(cc) &&
            !strcmp(pn_condition_get_name(cc), "amqp:connection:forced"))
        {
            pn_transport_t* t = pn_event_transport(event);
            pn_condition_t* tc = pn_transport_condition(t);
            pn_condition_copy(tc, cc);
            pn_transport_close_tail(t);
            pn_connection_close(c);
            return ContinueLoop;
        }
        break;
    }
    case PN_TRANSPORT_CLOSED: {
        // If reconnect is turned on then handle closed on error here with reconnect attempt
        pn_connection_t* c = pn_event_connection(event);
        pn_transport_t* t = pn_event_transport(event);
        if (pn_condition_is_set(pn_transport_condition(t)) && can_reconnect(c)) {
            messaging_handler *mh = get_handler(event);
            if (mh) {           // Notify handler of pending reconnect
                transport trans = make_wrapper(t);
                try {
                    mh->on_transport_error(trans);
                } catch (const proton::error& e) {
                    // If this is the same error we are re-connecting for,
                    // ignore it.  It was probably thrown by the default
                    // messaging_handler::on_error(), and if not the user has
                    // already seen it.
                    //
                    // If this isn't the same error, then something unexpected
                    // has happened, so re-throw.
                    if (std::string(e.what()) != trans.error().what())
                        throw;
                }
            }
            // on_transport_error() may have closed the connection, check again.
            reconnect_context* rc = get_reconnect_context(c);
            if (rc && !(rc->stop_reconnect_)) {
                setup_reconnect(c);
                return ContinueLoop;
            }
        }
        // Otherwise, this connection will be freed by the proactor.
        // Mark its work_queue finished so it won't try to use the freed connection.
        connection_context::get(c).work_queue_.impl_.get()->finished();
        break;
    }
    default:
        break;
    }

    messaging_handler *mh = get_handler(event);
    if (mh) messaging_adapter::dispatch(*mh, event);
    return ContinueLoop;
}