in lib/ConnectionPool.cc [64:115]
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
const std::string& logicalAddress, const std::string& physicalAddress) {
if (closed_) {
Promise<Result, ClientConnectionWeakPtr> promise;
promise.setFailed(ResultAlreadyClosed);
return promise.getFuture();
}
std::unique_lock<std::mutex> lock(mutex_);
if (poolConnections_) {
PoolMap::iterator cnxIt = pool_.find(logicalAddress);
if (cnxIt != pool_.end()) {
ClientConnectionPtr cnx = cnxIt->second.lock();
if (cnx && !cnx->isClosed()) {
// Found a valid or pending connection in the pool
LOG_DEBUG("Got connection from pool for " << logicalAddress << " use_count: " //
<< (cnx.use_count() - 1) << " @ " << cnx.get());
return cnx->getConnectFuture();
} else {
// Deleting stale connection
LOG_INFO("Deleting stale connection from pool for "
<< logicalAddress << " use_count: " << (cnx.use_count() - 1) << " @ " << cnx.get());
pool_.erase(logicalAddress);
}
}
}
// No valid or pending connection found in the pool, creating a new one
ClientConnectionPtr cnx;
try {
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(),
clientConfiguration_, authentication_, clientVersion_));
} catch (const std::runtime_error& e) {
lock.unlock();
LOG_ERROR("Failed to create connection: " << e.what())
Promise<Result, ClientConnectionWeakPtr> promise;
promise.setFailed(ResultConnectError);
return promise.getFuture();
}
LOG_INFO("Created connection for " << logicalAddress);
Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
pool_.insert(std::make_pair(logicalAddress, cnx));
lock.unlock();
cnx->tcpConnectAsync();
return future;
}