Future ConnectionPool::getConnectionAsync()

in lib/ConnectionPool.cc [64:115]


Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
    const std::string& logicalAddress, const std::string& physicalAddress) {
    if (closed_) {
        Promise<Result, ClientConnectionWeakPtr> promise;
        promise.setFailed(ResultAlreadyClosed);
        return promise.getFuture();
    }

    std::unique_lock<std::mutex> lock(mutex_);

    if (poolConnections_) {
        PoolMap::iterator cnxIt = pool_.find(logicalAddress);
        if (cnxIt != pool_.end()) {
            ClientConnectionPtr cnx = cnxIt->second.lock();

            if (cnx && !cnx->isClosed()) {
                // Found a valid or pending connection in the pool
                LOG_DEBUG("Got connection from pool for " << logicalAddress << " use_count: "  //
                                                          << (cnx.use_count() - 1) << " @ " << cnx.get());
                return cnx->getConnectFuture();
            } else {
                // Deleting stale connection
                LOG_INFO("Deleting stale connection from pool for "
                         << logicalAddress << " use_count: " << (cnx.use_count() - 1) << " @ " << cnx.get());
                pool_.erase(logicalAddress);
            }
        }
    }

    // No valid or pending connection found in the pool, creating a new one
    ClientConnectionPtr cnx;
    try {
        cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(),
                                       clientConfiguration_, authentication_, clientVersion_));
    } catch (const std::runtime_error& e) {
        lock.unlock();
        LOG_ERROR("Failed to create connection: " << e.what())
        Promise<Result, ClientConnectionWeakPtr> promise;
        promise.setFailed(ResultConnectError);
        return promise.getFuture();
    }

    LOG_INFO("Created connection for " << logicalAddress);

    Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
    pool_.insert(std::make_pair(logicalAddress, cnx));

    lock.unlock();

    cnx->tcpConnectAsync();
    return future;
}