in lib/ConnectionPool.cc [76:133]
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
const std::string& physicalAddress,
size_t keySuffix) {
if (closed_) {
Promise<Result, ClientConnectionWeakPtr> promise;
promise.setFailed(ResultAlreadyClosed);
return promise.getFuture();
}
std::unique_lock<std::recursive_mutex> lock(mutex_);
auto key = getKey(logicalAddress, physicalAddress, keySuffix);
PoolMap::iterator cnxIt = pool_.find(key);
if (cnxIt != pool_.end()) {
auto& cnx = cnxIt->second;
if (!cnx->isClosed()) {
// Found a valid or pending connection in the pool
LOG_DEBUG("Got connection from pool for " << key << " use_count: " //
<< (cnx.use_count()) << " @ " << cnx.get());
return cnx->getConnectFuture();
} else {
// The closed connection should have been removed from the pool in ClientConnection::close
LOG_WARN("Deleting stale connection from pool for " << key << " use_count: " << (cnx.use_count())
<< " @ " << cnx.get());
pool_.erase(key);
}
}
// No valid or pending connection found in the pool, creating a new one
ClientConnectionPtr cnx;
try {
cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(keySuffix),
clientConfiguration_, authentication_, clientVersion_, *this,
keySuffix));
} catch (Result result) {
Promise<Result, ClientConnectionWeakPtr> promise;
promise.setFailed(result);
return promise.getFuture();
} catch (const std::runtime_error& e) {
lock.unlock();
LOG_ERROR("Failed to create connection: " << e.what())
Promise<Result, ClientConnectionWeakPtr> promise;
promise.setFailed(ResultConnectError);
return promise.getFuture();
}
LOG_INFO("Created connection for " << key);
Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
pool_.insert(std::make_pair(key, cnx));
lock.unlock();
cnx->tcpConnectAsync();
return future;
}