void ClientConnection::close()

in lib/ClientConnection.cc [1187:1266]


void ClientConnection::close(Result result) {
    Lock lock(mutex_);
    if (isClosed()) {
        return;
    }
    state_ = Disconnected;

    closeSocket();
    if (tlsSocket_) {
        boost::system::error_code err;
        tlsSocket_->lowest_layer().close(err);
        if (err) {
            LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message());
        }
    }

    if (executor_) {
        executor_.reset();
    }

    // Move the internal fields to process them after `mutex_` was unlocked
    auto consumers = std::move(consumers_);
    auto producers = std::move(producers_);
    auto pendingRequests = std::move(pendingRequests_);
    auto pendingLookupRequests = std::move(pendingLookupRequests_);
    auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_);
    auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_);
    auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_);

    numOfPendingLookupRequest_ = 0;

    if (keepAliveTimer_) {
        keepAliveTimer_->cancel();
        keepAliveTimer_.reset();
    }

    if (consumerStatsRequestTimer_) {
        consumerStatsRequestTimer_->cancel();
        consumerStatsRequestTimer_.reset();
    }

    if (connectTimeoutTask_) {
        connectTimeoutTask_->stop();
    }

    lock.unlock();
    if (result != ResultDisconnected && result != ResultRetryable) {
        LOG_ERROR(cnxString_ << "Connection closed with " << result);
    } else {
        LOG_INFO(cnxString_ << "Connection disconnected");
    }

    for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
        HandlerBase::handleDisconnection(result, shared_from_this(), it->second);
    }

    for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) {
        HandlerBase::handleDisconnection(result, shared_from_this(), it->second);
    }

    connectPromise_.setFailed(result);

    // Fail all pending requests, all these type are map whose value type contains the Promise object
    for (auto& kv : pendingRequests) {
        kv.second.promise.setFailed(result);
    }
    for (auto& kv : pendingLookupRequests) {
        kv.second.promise->setFailed(result);
    }
    for (auto& kv : pendingConsumerStatsMap) {
        LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later");
        kv.second.setFailed(result);
    }
    for (auto& kv : pendingGetLastMessageIdRequests) {
        kv.second.promise->setFailed(result);
    }
    for (auto& kv : pendingGetNamespaceTopicsRequests) {
        kv.second.setFailed(result);
    }
}