in lib/HandlerBase.cc [102:149]
void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl) {
bool expectedState = false;
if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) {
LOG_INFO(getName() << "Ignoring reconnection attempt since there's already a pending reconnection");
return;
}
if (getCnx().lock()) {
LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
reconnectionPending_ = false;
return;
}
LOG_INFO(getName() << "Getting connection from pool");
ClientImplPtr client = client_.lock();
if (!client) {
LOG_WARN(getName() << "Client is invalid when calling grabCnx()");
connectionFailed(ResultAlreadyClosed);
reconnectionPending_ = false;
return;
}
auto self = shared_from_this();
auto cnxFuture = getConnection(client, assignedBrokerUrl);
using namespace std::chrono;
auto before = high_resolution_clock::now();
cnxFuture.addListener([this, self, before](Result result, const ClientConnectionPtr& cnx) {
if (result == ResultOk) {
connectionOpened(cnx).addListener([this, self, before](Result result, bool) {
// Do not use bool, only Result.
reconnectionPending_ = false;
if (result == ResultOk) {
connectionTimeMs_ =
duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
// Prevent the creationTimer_ from cancelling the timer_ in future
ASIO_ERROR ignored;
creationTimer_->cancel(ignored);
LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms")
} else if (isResultRetryable(result)) {
scheduleReconnection();
}
});
} else {
connectionFailed(result);
reconnectionPending_ = false;
scheduleReconnection();
}
});
}