void ClientConnection::close()

in lib/ClientConnection.cc [1278:1381]


void ClientConnection::close(Result result, bool detach) {
    Lock lock(mutex_);
    if (isClosed()) {
        return;
    }
    state_ = Disconnected;

    if (socket_) {
        ASIO_ERROR err;
        socket_->shutdown(ASIO::socket_base::shutdown_both, err);
        socket_->close(err);
        if (err) {
            LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
        }
    }
    if (tlsSocket_) {
        ASIO_ERROR err;
        tlsSocket_->lowest_layer().close(err);
        if (err) {
            LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message());
        }
    }

    if (executor_) {
        executor_.reset();
    }

    // Move the internal fields to process them after `mutex_` was unlocked
    auto consumers = std::move(consumers_);
    auto producers = std::move(producers_);
    auto pendingRequests = std::move(pendingRequests_);
    auto pendingLookupRequests = std::move(pendingLookupRequests_);
    auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_);
    auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_);
    auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_);
    auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_);

    numOfPendingLookupRequest_ = 0;

    if (keepAliveTimer_) {
        keepAliveTimer_->cancel();
        keepAliveTimer_.reset();
    }

    if (consumerStatsRequestTimer_) {
        consumerStatsRequestTimer_->cancel();
        consumerStatsRequestTimer_.reset();
    }

    if (connectTimeoutTask_) {
        connectTimeoutTask_->stop();
    }

    lock.unlock();
    int refCount = weak_from_this().use_count();
    if (!isResultRetryable(result)) {
        LOG_ERROR(cnxString_ << "Connection closed with " << result << " (refCnt: " << refCount << ")");
    } else {
        LOG_INFO(cnxString_ << "Connection disconnected (refCnt: " << refCount << ")");
    }
    // Remove the connection from the pool before completing any promise
    if (detach) {
        pool_.remove(logicalAddress_, physicalAddress_, poolIndex_, this);
    }

    auto self = shared_from_this();
    for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
        auto producer = it->second.lock();
        if (producer) {
            producer->handleDisconnection(result, self);
        }
    }

    for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) {
        auto consumer = it->second.lock();
        if (consumer) {
            consumer->handleDisconnection(result, self);
        }
    }
    self.reset();

    connectPromise_.setFailed(result);

    // Fail all pending requests, all these type are map whose value type contains the Promise object
    for (auto& kv : pendingRequests) {
        kv.second.promise.setFailed(result);
    }
    for (auto& kv : pendingLookupRequests) {
        kv.second.promise->setFailed(result);
    }
    for (auto& kv : pendingConsumerStatsMap) {
        LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later");
        kv.second.setFailed(result);
    }
    for (auto& kv : pendingGetLastMessageIdRequests) {
        kv.second.promise->setFailed(result);
    }
    for (auto& kv : pendingGetNamespaceTopicsRequests) {
        kv.second.setFailed(result);
    }
    for (auto& kv : pendingGetSchemaRequests) {
        kv.second.promise.setFailed(result);
    }
}