in cpp/src/proactor_container_impl.cpp [560:740]
container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) {
// If we have any pending connection work, do it now
pn_connection_t* c = pn_event_connection(event);
if (c) {
work_queue::impl* queue = connection_context::get(c).work_queue_.impl_.get();
queue->run_all_jobs();
}
// Process events that shouldn't be sent to messaging_handler
switch (pn_event_type(event)) {
case PN_PROACTOR_INACTIVE: /* listener and all connections closed */
// If we're stopping interrupt all other threads still running
if (auto_stop_) pn_proactor_interrupt(proactor_);
return ContinueLoop;
// We only interrupt to stop threads
case PN_PROACTOR_INTERRUPT: {
// Interrupt any other threads still running
GUARD(lock_);
if (threads_>1) pn_proactor_interrupt(proactor_);
return EndLoop;
}
case PN_PROACTOR_TIMEOUT: {
// Can get an immediate timeout, if we have a container event loop inject
run_timer_jobs();
// Run every container event loop job
// This is not at all efficient and single threads all these jobs, but it does correctly
// serialise them
work_queues queues;
{
GUARD(work_queues_lock_);
queues = work_queues_;
}
for (work_queues::iterator queue = queues.begin(); queue!=queues.end(); ++queue) {
(*queue)->run_all_jobs();
}
return EndBatch;
}
case PN_LISTENER_OPEN: {
pn_listener_t* l = pn_event_listener(event);
proton::listen_handler* handler;
{
GUARD(lock_);
listener_context &lc(listener_context::get(l));
handler = lc.listen_handler_;
}
if (handler) {
listener lstnr(l);
handler->on_open(lstnr);
}
return ContinueLoop;
}
case PN_LISTENER_ACCEPT: {
pn_listener_t* l = pn_event_listener(event);
pn_connection_t* c = pn_connection();
pn_connection_set_container(c, id_.c_str());
connection_options opts = server_connection_options_;
listen_handler* handler;
listener_context* lc;
const connection_options* options;
{
GUARD(lock_);
lc = &listener_context::get(l);
handler = lc->listen_handler_;
options = lc->connection_options_.get();
}
if (handler) {
listener lstr(l);
opts.update(handler->on_accept(lstr));
}
else if (options) opts.update(*options);
// Handler applied separately
connection_context& cc = connection_context::get(c);
cc.container = &container_;
cc.listener_context_ = lc;
cc.handler = opts.handler();
cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, c);
pn_transport_t* pnt = pn_transport();
pn_transport_set_server(pnt);
opts.apply_unbound_server(pnt);
pn_listener_accept2(l, c, pnt);
return ContinueLoop;
}
case PN_LISTENER_CLOSE: {
pn_listener_t* l = pn_event_listener(event);
proton::listen_handler* handler;
{
GUARD(lock_);
listener_context &lc(listener_context::get(l));
handler = lc.listen_handler_;
}
listener lstnr(l);
if (handler) {
pn_condition_t* c = pn_listener_condition(l);
if (pn_condition_is_set(c)) {
handler->on_error(lstnr, make_wrapper(c).what());
}
handler->on_close(lstnr);
}
return ContinueLoop;
}
// Connection driver will bind a new transport to the connection at this point
case PN_CONNECTION_INIT:
return ContinueLoop;
case PN_CONNECTION_REMOTE_OPEN: {
// This is the only event that we get indicating that the connection succeeded so
// it's the only place to reset the reconnection logic.
//
// Just note we have a connection then process normally
pn_connection_t* c = pn_event_connection(event);
reset_reconnect(c);
break;
}
case PN_CONNECTION_REMOTE_CLOSE: {
pn_connection_t *c = pn_event_connection(event);
pn_condition_t *cc = pn_connection_remote_condition(c);
// If reconnect is on, amqp:connection:forced should be treated specially:
// Hide the connection error/close events from the application;
// Then we close the connection noting the forced close;
// Then set up for reconnect handling.
if (get_reconnect_context(c) &&
pn_condition_is_set(cc) &&
!strcmp(pn_condition_get_name(cc), "amqp:connection:forced"))
{
pn_transport_t* t = pn_event_transport(event);
pn_condition_t* tc = pn_transport_condition(t);
pn_condition_copy(tc, cc);
pn_transport_close_tail(t);
pn_connection_close(c);
return ContinueLoop;
}
break;
}
case PN_TRANSPORT_CLOSED: {
// If reconnect is turned on then handle closed on error here with reconnect attempt
pn_connection_t* c = pn_event_connection(event);
pn_transport_t* t = pn_event_transport(event);
if (pn_condition_is_set(pn_transport_condition(t)) && can_reconnect(c)) {
messaging_handler *mh = get_handler(event);
if (mh) { // Notify handler of pending reconnect
transport trans = make_wrapper(t);
try {
mh->on_transport_error(trans);
} catch (const proton::error& e) {
// If this is the same error we are re-connecting for,
// ignore it. It was probably thrown by the default
// messaging_handler::on_error(), and if not the user has
// already seen it.
//
// If this isn't the same error, then something unexpected
// has happened, so re-throw.
if (std::string(e.what()) != trans.error().what())
throw;
}
}
// on_transport_error() may have closed the connection, check again.
reconnect_context* rc = get_reconnect_context(c);
if (rc && !(rc->stop_reconnect_)) {
setup_reconnect(c);
return ContinueLoop;
}
}
// Otherwise, this connection will be freed by the proactor.
// Mark its work_queue finished so it won't try to use the freed connection.
connection_context::get(c).work_queue_.impl_.get()->finished();
break;
}
default:
break;
}
messaging_handler *mh = get_handler(event);
if (mh) messaging_adapter::dispatch(*mh, event);
return ContinueLoop;
}