void HandlerBase::grabCnx()

in lib/HandlerBase.cc [102:149]


void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl) {
    bool expectedState = false;
    if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
        LOG_INFO(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
        return;
    }

    if (getCnx().lock()) {
        LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
        reconnectionPending_ = false;
        return;
    }

    LOG_INFO(getName() << "Getting connection from pool");
    ClientImplPtr client = client_.lock();
    if (!client) {
        LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
        connectionFailed(ResultAlreadyClosed);
        reconnectionPending_ = false;
        return;
    }
    auto self = shared_from_this();
    auto cnxFuture = getConnection(client, assignedBrokerUrl);
    using namespace std::chrono;
    auto before = high_resolution_clock::now();
    cnxFuture.addListener([this, self, before](Result result, const ClientConnectionPtr& cnx) {
        if (result == ResultOk) {
            connectionOpened(cnx).addListener([this, self, before](Result result, bool) {
                // Do not use bool, only Result.
                reconnectionPending_ = false;
                if (result == ResultOk) {
                    connectionTimeMs_ =
                        duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
                    // Prevent the creationTimer_ from cancelling the timer_ in future
                    ASIO_ERROR ignored;
                    creationTimer_->cancel(ignored);
                    LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms")
                } else if (isResultRetryable(result)) {
                    scheduleReconnection();
                }
            });
        } else {
            connectionFailed(result);
            reconnectionPending_ = false;
            scheduleReconnection();
        }
    });
}