cpp/src/proactor_container_impl.cpp (614 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "proactor_container_impl.hpp" #include "proactor_work_queue_impl.hpp" #include "connect_config.hpp" #include "proton/error_condition.hpp" #include "proton/listen_handler.hpp" #include "proton/listener.hpp" #include "proton/reconnect_options.hpp" #include "proton/ssl.hpp" #include "proton/transport.hpp" #include "proton/url.hpp" #include "proton/connection.h" #include "proton/listener.h" #include "proton/proactor.h" #include "proton/transport.h" #include "contexts.hpp" #include "messaging_adapter.hpp" #include "reconnect_options_impl.hpp" #include "proton_bits.hpp" #include <assert.h> #include <string.h> #include <algorithm> #include <vector> #include <thread> #include <random> // XXXX: Debug //#include <iostream> namespace proton { class container::impl::common_work_queue : public work_queue::impl { public: common_work_queue(container::impl& c): container_(c), finished_(false), running_(false) {} typedef std::vector<work> jobs; void run_all_jobs(); void finished() { GUARD(lock_); finished_ = true; } void schedule(duration, work); MUTEX(lock_) container::impl& container_; jobs jobs_; bool finished_; bool running_; }; void container::impl::common_work_queue::schedule(duration d, work f) { // Note this is an unbounded work queue. // A resource-safe implementation should be bounded. if (finished_) return; container_.schedule(d, make_work(&work_queue::impl::add_void, (work_queue::impl*)this, f)); } void container::impl::common_work_queue::run_all_jobs() { jobs j; // Lock this operation for mt { GUARD(lock_); // Ensure that we never run work from this queue concurrently if (running_) return; running_ = true; // But allow adding to the queue concurrently to running std::swap(j, jobs_); } // Run queued work, but ignore any exceptions for (jobs::iterator f = j.begin(); f != j.end(); ++f) try { (*f)(); } catch (...) {}; { GUARD(lock_); running_ = false; } return; } class container::impl::connection_work_queue : public common_work_queue { public: connection_work_queue(container::impl& ct, pn_connection_t* c): common_work_queue(ct), connection_(c) {} bool add(work f); pn_connection_t* connection_; }; bool container::impl::connection_work_queue::add(work f) { // Note this is an unbounded work queue. // A resource-safe implementation should be bounded. GUARD(lock_); if (finished_) return false; jobs_.push_back(f); pn_connection_wake(connection_); return true; } class container::impl::container_work_queue : public common_work_queue { public: container_work_queue(container::impl& c): common_work_queue(c) {} ~container_work_queue() { container_.remove_work_queue(this); } bool add(work f); }; bool container::impl::container_work_queue::add(work f) { // Note this is an unbounded work queue. // A resource-safe implementation should be bounded. GUARD(lock_); if (finished_) return false; jobs_.push_back(f); pn_proactor_set_timeout(container_.proactor_, 0); return true; } class work_queue::impl* container::impl::make_work_queue(container& c) { return c.impl_->add_work_queue(); } container::impl::impl(container& c, const std::string& id, messaging_handler* mh) : threads_(0), container_(c), proactor_(pn_proactor()), handler_(mh), id_(id), reconnecting_(0), auto_stop_(true), stopping_(false) {} container::impl::~impl() { pn_proactor_free(proactor_); } container::impl::container_work_queue* container::impl::add_work_queue() { container_work_queue* c = new container_work_queue(*this); GUARD(work_queues_lock_); work_queues_.insert(c); return c; } void container::impl::remove_work_queue(container::impl::container_work_queue* l) { GUARD(work_queues_lock_); work_queues_.erase(l); } namespace { void default_url_options(connection_options& opts, const url& url) { opts.virtual_host(url.host()); if (!url.user().empty()) opts.user(url.user()); if (!url.password().empty()) opts.password(url.password()); // If scheme is amqps then use default tls settings if (url.scheme()==url.AMQPS) { opts.ssl_client_options(ssl_client_options()); } } } pn_connection_t* container::impl::make_connection_lh( const url& url, const connection_options& user_opts) { if (stopping_) throw proton::error("container is stopping"); connection_options opts; opts.container_id(id_); default_url_options(opts, url); opts.update(client_connection_options_); opts.update(user_opts); messaging_handler* mh = opts.handler(); pn_connection_t *pnc = pn_connection(); connection_context& cc(connection_context::get(pnc)); cc.container = &container_; cc.handler = mh; cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, pnc); cc.reconnect_url_ = url; cc.active_url_ = url; cc.connection_options_.reset(new connection_options(opts)); make_wrapper(pnc).open(*cc.connection_options_); return pnc; // 1 refcount from pn_connection() } // Takes ownership of pnc // // NOTE: After the call to start_connection() pnc is active in a proactor thread, // and may even have been freed already. It is undefined to use pnc (or any // object belonging to it) except in appropriate handlers. // // SUBTLE NOTE: There must not be any proton::object wrappers in scope when // start_connection() is called. The wrapper destructor will call pn_decref() // after start_connection() which is undefined! // void container::impl::start_connection(const url& url, pn_connection_t *pnc) { pn_transport_t* pnt = pn_transport(); connection_context& cc = connection_context::get(pnc); connection_options& co = *cc.connection_options_; co.apply_unbound_client(pnt); char caddr[PN_MAX_ADDR]; pn_proactor_addr(caddr, sizeof(caddr), url.host().c_str(), url.port().c_str()); pn_proactor_connect2(proactor_, pnc, pnt, caddr); // Takes ownership of pnc, pnt } void container::impl::reconnect(pn_connection_t* pnc) { --reconnecting_; if (stopping_ && reconnecting_==0) { pn_connection_free(pnc); //TODO: We've lost the error - we should really propagate it here pn_proactor_disconnect(proactor_, NULL); return; } connection_context& cc = connection_context::get(pnc); reconnect_context& rc = *cc.reconnect_context_.get(); connection_options& co = *cc.connection_options_; co.apply_reconnect_urls(pnc); // Figure out next connection url to try // rc.current_url_ == -1 means try the url specified in connect, not a failover url const proton::url url(rc.current_url_==-1 ? cc.reconnect_url_ : cc.failover_urls_[rc.current_url_]); // XXXX Debug: //std::cout << "Retries: " << rc.retries_ << " Delay: " << rc.delay_ << " Trying: " << url << "@" << rc.current_url_ << std::endl; ++rc.current_url_; // Did we go through all the urls? if (rc.current_url_==int(cc.failover_urls_.size())) { rc.current_url_ = -1; ++rc.retries_; } connection_options opts; opts.container_id(id_); default_url_options(opts, url); opts.update(co); messaging_handler* mh = opts.handler(); cc.handler = mh; cc.active_url_ = url; make_wrapper(pnc).open(co); start_connection(url, pnc); } namespace { duration random_between(duration min, duration max) { static thread_local std::default_random_engine gen; std::uniform_int_distribution<duration::numeric_type> dist{min.milliseconds(), max.milliseconds()}; return duration(dist(gen)); } duration next_delay(reconnect_context& rc) { // If we've not retried before do it immediately if (rc.retries_==0) return duration(0); // If we haven't tried all failover urls yet this round do it immediately if (rc.current_url_!=-1) return duration(0); const reconnect_options_base& roi = rc.reconnect_options_; if (rc.retries_==1) { rc.delay_ = roi.delay; } else { rc.delay_ = std::min(roi.max_delay, rc.delay_ * roi.delay_multiplier); } return random_between(roi.delay, rc.delay_); } inline reconnect_context* get_reconnect_context(pn_connection_t* pnc) { return connection_context::get(pnc).reconnect_context_.get(); } void reset_reconnect(pn_connection_t* pnc) { reconnect_context* rc = get_reconnect_context(pnc); if (!rc) return; rc->delay_ = 0; rc->retries_ = 0; // set retry to the initial url next rc->current_url_ = -1; } } bool container::impl::can_reconnect(pn_connection_t* pnc) { reconnect_context* rc = get_reconnect_context(pnc); // If reconnect not enabled just fail if (!rc) return false; // Don't reconnect if we are locally closed, the application will // not expect a connection it closed to re-open. if (rc->stop_reconnect_) return false; // If container stopping don't try to reconnect // - we pretend to have set up a reconnect attempt so // that the proactor disconnect will finish and we will exit // the run loop without error. { GUARD(lock_); if (stopping_) return true; } const reconnect_options_base& roi = rc->reconnect_options_; pn_transport_t* t = pn_connection_transport(pnc); pn_condition_t* condition = pn_transport_condition(t); // If we failed to authenticate then don't reconnect any more and just fail if ( !strcmp(pn_condition_get_name(condition), "amqp:unauthorized-access") ) return false; // If too many reconnect attempts just fail if ( roi.max_attempts != 0 && rc->retries_ >= roi.max_attempts) { pn_condition_format(condition, "proton:io", "Too many reconnect attempts (%d)", rc->retries_); return false; } return true; } void container::impl::setup_reconnect(pn_connection_t* pnc) { connection_context& cc = connection_context::get(pnc); reconnect_context* rc = cc.reconnect_context_.get(); if (!rc) return; rc->reconnected_ = true; // Recover connection from proactor pn_proactor_release_connection(pnc); // Figure out delay till next reconnect duration delay = next_delay(*rc); // Schedule reconnect - can do this on container work queue as no one can have the connection // now anyway schedule(delay, make_work(&container::impl::reconnect, this, pnc)); ++reconnecting_; } returned<connection> container::impl::connect( const std::string& addr, const proton::connection_options& user_opts) { proton::url url(addr); pn_connection_t* pnc = 0; { GUARD(lock_); pnc = make_connection_lh(url, user_opts); } start_connection(url, pnc); // See comment on start_connection return make_returned<proton::connection>(pnc); } returned<connection> container::impl::connect() { connection_options opts; std::string addr = apply_config(opts); return connect(addr, opts); } returned<sender> container::impl::open_sender(const std::string &urlstr, const proton::sender_options &o1, const connection_options &o2) { proton::url url(urlstr); pn_link_t* pnl = 0; pn_connection_t* pnc = 0; { GUARD(lock_); proton::sender_options lopts(sender_options_); lopts.update(o1); pnc = make_connection_lh(url, o2); connection conn(make_wrapper(pnc)); pnl = unwrap(conn.default_session().open_sender(url.path(), lopts)); } start_connection(url, pnc); // See comment on start_connection return make_returned<sender>(pnl); // Unsafe returned pointer } returned<receiver> container::impl::open_receiver(const std::string &urlstr, const proton::receiver_options &o1, const connection_options &o2) { proton::url url(urlstr); pn_link_t* pnl = 0; pn_connection_t* pnc = 0; { GUARD(lock_); proton::receiver_options lopts(receiver_options_); lopts.update(o1); pnc = make_connection_lh(url, o2); connection conn(make_wrapper(pnc)); pnl = unwrap(conn.default_session().open_receiver(url.path(), lopts)); } start_connection(url, pnc); // See comment on start_connection return make_returned<receiver>(pnl); } pn_listener_t* container::impl::listen_common_lh(const std::string& addr) { if (stopping_) throw proton::error("container is stopping"); proton::url url(addr, false); // Don't want un-helpful defaults like "localhost" // Figure out correct string len then create connection address int len = pn_proactor_addr(0, 0, url.host().c_str(), url.port().c_str()); std::vector<char> caddr(len+1); pn_proactor_addr(&caddr[0], len+1, url.host().c_str(), url.port().c_str()); pn_listener_t* listener = pn_listener(); pn_listener_set_context(listener, &container_); pn_proactor_listen(proactor_, listener, &caddr[0], 16); return listener; } proton::listener container::impl::listen(const std::string& addr) { GUARD(lock_); pn_listener_t* listener = listen_common_lh(addr); return proton::listener(listener); } proton::listener container::impl::listen(const std::string& addr, const proton::connection_options& opts) { GUARD(lock_); pn_listener_t* listener = listen_common_lh(addr); listener_context& lc=listener_context::get(listener); lc.connection_options_.reset(new connection_options(opts)); return proton::listener(listener); } proton::listener container::impl::listen(const std::string& addr, proton::listen_handler& lh) { GUARD(lock_); pn_listener_t* listener = listen_common_lh(addr); listener_context& lc=listener_context::get(listener); lc.listen_handler_ = &lh; return proton::listener(listener); } work_handle container::impl::schedule(duration delay, work f) { GUARD(deferred_lock_); timestamp now = timestamp::now(); // Record timeout; Add callback to timeout sorted list scheduled s = {now+delay, f, current_work_handle_}; ++current_work_handle_; deferred_.push_back(s); std::push_heap(deferred_.begin(), deferred_.end()); // Set timeout for current head of timeout queue scheduled* next = &deferred_.front(); pn_millis_t timeout_ms = (now < next->time) ? (next->time-now).milliseconds() : 0; pn_proactor_set_timeout(proactor_, timeout_ms); is_active_.insert(s.w_handle); return s.w_handle; } void container::impl::cancel(work_handle work_handle) { GUARD(deferred_lock_); is_active_.erase(work_handle); } void container::impl::client_connection_options(const connection_options &opts) { GUARD(lock_); client_connection_options_ = opts; } void container::impl::server_connection_options(const connection_options &opts) { GUARD(lock_); server_connection_options_ = opts; } void container::impl::sender_options(const proton::sender_options &opts) { GUARD(lock_); sender_options_ = opts; } void container::impl::receiver_options(const proton::receiver_options &opts) { GUARD(lock_); receiver_options_ = opts; } void container::impl::run_timer_jobs() { timestamp now = timestamp::now(); std::vector<scheduled> tasks; // We first extract all the runnable tasks and then run them - this is to avoid having tasks // injected as we are running them (which could potentially never end) { GUARD(deferred_lock_); // Figure out how many tasks we need to execute and pop them to the back of the // queue (in reverse order) unsigned i = 0; for (;;) { // Have we seen all the queued tasks? if ( deferred_.size()-i==0 ) break; // Is the next task in the future? timestamp next_time = deferred_.front().time; if ( next_time>now ) { pn_proactor_set_timeout(proactor_, (next_time-now).milliseconds()); break; } std::pop_heap(deferred_.begin(), deferred_.end()-i); ++i; } // Nothing to do if ( i==0 ) return; // Now we know how many tasks to run if ( deferred_.size()==i ) { // If we sorted the entire heap, then we're executing every task // so don't need to copy and can just swap tasks.swap(deferred_); } else { // Otherwise just copy the ones we sorted tasks = std::vector<scheduled>(deferred_.end()-i, deferred_.end()); // Remove tasks to be executed deferred_.resize(deferred_.size()-i); } } // We've now taken the tasks to run from the deferred tasks // so we can run them unlocked // NB. We copied the due tasks in reverse order so execute from end for (int i = tasks.size()-1; i>=0; --i) { if(is_active_.count(tasks[i].w_handle)) { tasks[i].task(); is_active_.erase(tasks[i].w_handle); } } } // Return true if this thread is finished container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) { // If we have any pending connection work, do it now pn_connection_t* c = pn_event_connection(event); if (c) { work_queue::impl* queue = connection_context::get(c).work_queue_.impl_.get(); queue->run_all_jobs(); } // Process events that shouldn't be sent to messaging_handler switch (pn_event_type(event)) { case PN_PROACTOR_INACTIVE: /* listener and all connections closed */ // If we're stopping interrupt all other threads still running if (auto_stop_) pn_proactor_interrupt(proactor_); return ContinueLoop; // We only interrupt to stop threads case PN_PROACTOR_INTERRUPT: { // Interrupt any other threads still running GUARD(lock_); if (threads_>1) pn_proactor_interrupt(proactor_); return EndLoop; } case PN_PROACTOR_TIMEOUT: { // Can get an immediate timeout, if we have a container event loop inject run_timer_jobs(); // Run every container event loop job // This is not at all efficient and single threads all these jobs, but it does correctly // serialise them work_queues queues; { GUARD(work_queues_lock_); queues = work_queues_; } for (work_queues::iterator queue = queues.begin(); queue!=queues.end(); ++queue) { (*queue)->run_all_jobs(); } return EndBatch; } case PN_LISTENER_OPEN: { pn_listener_t* l = pn_event_listener(event); proton::listen_handler* handler; { GUARD(lock_); listener_context &lc(listener_context::get(l)); handler = lc.listen_handler_; } if (handler) { listener lstnr(l); handler->on_open(lstnr); } return ContinueLoop; } case PN_LISTENER_ACCEPT: { pn_listener_t* l = pn_event_listener(event); pn_connection_t* c = pn_connection(); pn_connection_set_container(c, id_.c_str()); connection_options opts = server_connection_options_; listen_handler* handler; listener_context* lc; const connection_options* options; { GUARD(lock_); lc = &listener_context::get(l); handler = lc->listen_handler_; options = lc->connection_options_.get(); } if (handler) { listener lstr(l); opts.update(handler->on_accept(lstr)); } else if (options) opts.update(*options); // Handler applied separately connection_context& cc = connection_context::get(c); cc.container = &container_; cc.listener_context_ = lc; cc.handler = opts.handler(); cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, c); pn_transport_t* pnt = pn_transport(); pn_transport_set_server(pnt); opts.apply_unbound_server(pnt); pn_listener_accept2(l, c, pnt); return ContinueLoop; } case PN_LISTENER_CLOSE: { pn_listener_t* l = pn_event_listener(event); proton::listen_handler* handler; { GUARD(lock_); listener_context &lc(listener_context::get(l)); handler = lc.listen_handler_; } listener lstnr(l); if (handler) { pn_condition_t* c = pn_listener_condition(l); if (pn_condition_is_set(c)) { handler->on_error(lstnr, make_wrapper(c).what()); } handler->on_close(lstnr); } return ContinueLoop; } // Connection driver will bind a new transport to the connection at this point case PN_CONNECTION_INIT: return ContinueLoop; case PN_CONNECTION_REMOTE_OPEN: { // This is the only event that we get indicating that the connection succeeded so // it's the only place to reset the reconnection logic. // // Just note we have a connection then process normally pn_connection_t* c = pn_event_connection(event); reset_reconnect(c); break; } case PN_CONNECTION_REMOTE_CLOSE: { pn_connection_t *c = pn_event_connection(event); pn_condition_t *cc = pn_connection_remote_condition(c); // If reconnect is on, amqp:connection:forced should be treated specially: // Hide the connection error/close events from the application; // Then we close the connection noting the forced close; // Then set up for reconnect handling. if (get_reconnect_context(c) && pn_condition_is_set(cc) && !strcmp(pn_condition_get_name(cc), "amqp:connection:forced")) { pn_transport_t* t = pn_event_transport(event); pn_condition_t* tc = pn_transport_condition(t); pn_condition_copy(tc, cc); pn_transport_close_tail(t); pn_connection_close(c); return ContinueLoop; } break; } case PN_TRANSPORT_CLOSED: { // If reconnect is turned on then handle closed on error here with reconnect attempt pn_connection_t* c = pn_event_connection(event); pn_transport_t* t = pn_event_transport(event); if (pn_condition_is_set(pn_transport_condition(t)) && can_reconnect(c)) { messaging_handler *mh = get_handler(event); if (mh) { // Notify handler of pending reconnect transport trans = make_wrapper(t); try { mh->on_transport_error(trans); } catch (const proton::error& e) { // If this is the same error we are re-connecting for, // ignore it. It was probably thrown by the default // messaging_handler::on_error(), and if not the user has // already seen it. // // If this isn't the same error, then something unexpected // has happened, so re-throw. if (std::string(e.what()) != trans.error().what()) throw; } } // on_transport_error() may have closed the connection, check again. reconnect_context* rc = get_reconnect_context(c); if (rc && !(rc->stop_reconnect_)) { setup_reconnect(c); return ContinueLoop; } } // Otherwise, this connection will be freed by the proactor. // Mark its work_queue finished so it won't try to use the freed connection. connection_context::get(c).work_queue_.impl_.get()->finished(); break; } default: break; } messaging_handler *mh = get_handler(event); if (mh) messaging_adapter::dispatch(*mh, event); return ContinueLoop; } // Figure out the handler for the primary object for event messaging_handler* container::impl::get_handler(pn_event_t *event) { messaging_handler *mh = 0; // First try for a link (send/receiver) handler pn_link_t *link = pn_event_link(event); if (link) mh = get_handler(link); // Try for session handler if no link handler pn_session_t *session = pn_event_session(event); if (session && !mh) mh = get_handler(session); // Try for connection handler if none of the above pn_connection_t *connection = pn_event_connection(event); if (connection && !mh) mh = get_handler(connection); // Use container handler if nothing more specific (must be a container handler) return mh ? mh : handler_; } void container::impl::thread() { bool finished; { GUARD(lock_); ++threads_; finished = stopping_; } while (!finished) { pn_event_batch_t *events = pn_proactor_wait(proactor_); pn_event_t *e; error_condition error; try { while ((e = pn_event_batch_next(events))) { dispatch_result r = dispatch(e); finished = r==EndLoop; if (r!=ContinueLoop) break; } } catch (const std::exception& e) { // If we caught an exception then shutdown the (other threads of the) container error = error_condition("exception", e.what()); } catch (...) { error = error_condition("exception", "container shut-down by unknown exception"); } pn_proactor_done(proactor_, events); if (!error.empty()) { finished = true; { GUARD(lock_); disconnect_error_ = error; } stop(error); } } { GUARD(lock_); --threads_; } } void container::impl::start_event() { if (handler_) handler_->on_container_start(container_); } void container::impl::stop_event() { if (handler_) handler_->on_container_stop(container_); } void container::impl::run(int threads) { // Have to "manually" generate container events CALL_ONCE(start_once_, &impl::start_event, this); // Run handler threads threads = std::max(threads, 1); // Ensure at least 1 thread typedef std::vector<std::thread*> vt; // pointer vector to work around failures in older compilers vt ts(threads-1); for (vt::iterator i = ts.begin(); i != ts.end(); ++i) { *i = new std::thread(&impl::thread, this); } thread(); // Use this thread too. // Wait for the other threads to stop for (vt::iterator i = ts.begin(); i != ts.end(); ++i) { (*i)->join(); delete *i; } bool last = false; { GUARD(lock_); last = threads_==0; } if (last) CALL_ONCE(stop_once_, &impl::stop_event, this); // Throw an exception if we disconnected the proactor because of an exception { GUARD(lock_); if (!disconnect_error_.empty()) throw proton::error(disconnect_error_.description()); }; } void container::impl::auto_stop(bool set) { GUARD(lock_); auto_stop_ = set; } void container::impl::stop(const proton::error_condition& err) { { GUARD(lock_); if (stopping_) return; // Already stopping auto_stop_ = true; stopping_ = true; // Have to wait until actual reconnect to stop or we leak the connection if (reconnecting_>0) return; } pn_condition_t* error_condition = pn_condition(); set_error_condition(err, error_condition); pn_proactor_disconnect(proactor_, error_condition); pn_condition_free(error_condition); } }