Future ConnectionPool::getConnectionAsync()

in lib/ConnectionPool.cc [76:133]


Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& logicalAddress,
                                                                           const std::string& physicalAddress,
                                                                           size_t keySuffix) {
    if (closed_) {
        Promise<Result, ClientConnectionWeakPtr> promise;
        promise.setFailed(ResultAlreadyClosed);
        return promise.getFuture();
    }

    std::unique_lock<std::recursive_mutex> lock(mutex_);

    auto key = getKey(logicalAddress, physicalAddress, keySuffix);

    PoolMap::iterator cnxIt = pool_.find(key);
    if (cnxIt != pool_.end()) {
        auto& cnx = cnxIt->second;

        if (!cnx->isClosed()) {
            // Found a valid or pending connection in the pool
            LOG_DEBUG("Got connection from pool for " << key << " use_count: "  //
                                                      << (cnx.use_count()) << " @ " << cnx.get());
            return cnx->getConnectFuture();
        } else {
            // The closed connection should have been removed from the pool in ClientConnection::close
            LOG_WARN("Deleting stale connection from pool for " << key << " use_count: " << (cnx.use_count())
                                                                << " @ " << cnx.get());
            pool_.erase(key);
        }
    }

    // No valid or pending connection found in the pool, creating a new one
    ClientConnectionPtr cnx;
    try {
        cnx.reset(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(keySuffix),
                                       clientConfiguration_, authentication_, clientVersion_, *this,
                                       keySuffix));
    } catch (Result result) {
        Promise<Result, ClientConnectionWeakPtr> promise;
        promise.setFailed(result);
        return promise.getFuture();
    } catch (const std::runtime_error& e) {
        lock.unlock();
        LOG_ERROR("Failed to create connection: " << e.what())
        Promise<Result, ClientConnectionWeakPtr> promise;
        promise.setFailed(ResultConnectError);
        return promise.getFuture();
    }

    LOG_INFO("Created connection for " << key);

    Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
    pool_.insert(std::make_pair(key, cnx));

    lock.unlock();

    cnx->tcpConnectAsync();
    return future;
}