in lib/ClientConnection.cc [1187:1266]
void ClientConnection::close(Result result) {
Lock lock(mutex_);
if (isClosed()) {
return;
}
state_ = Disconnected;
closeSocket();
if (tlsSocket_) {
boost::system::error_code err;
tlsSocket_->lowest_layer().close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message());
}
}
if (executor_) {
executor_.reset();
}
// Move the internal fields to process them after `mutex_` was unlocked
auto consumers = std::move(consumers_);
auto producers = std::move(producers_);
auto pendingRequests = std::move(pendingRequests_);
auto pendingLookupRequests = std::move(pendingLookupRequests_);
auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_);
auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_);
auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_);
numOfPendingLookupRequest_ = 0;
if (keepAliveTimer_) {
keepAliveTimer_->cancel();
keepAliveTimer_.reset();
}
if (consumerStatsRequestTimer_) {
consumerStatsRequestTimer_->cancel();
consumerStatsRequestTimer_.reset();
}
if (connectTimeoutTask_) {
connectTimeoutTask_->stop();
}
lock.unlock();
if (result != ResultDisconnected && result != ResultRetryable) {
LOG_ERROR(cnxString_ << "Connection closed with " << result);
} else {
LOG_INFO(cnxString_ << "Connection disconnected");
}
for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
HandlerBase::handleDisconnection(result, shared_from_this(), it->second);
}
for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) {
HandlerBase::handleDisconnection(result, shared_from_this(), it->second);
}
connectPromise_.setFailed(result);
// Fail all pending requests, all these type are map whose value type contains the Promise object
for (auto& kv : pendingRequests) {
kv.second.promise.setFailed(result);
}
for (auto& kv : pendingLookupRequests) {
kv.second.promise->setFailed(result);
}
for (auto& kv : pendingConsumerStatsMap) {
LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later");
kv.second.setFailed(result);
}
for (auto& kv : pendingGetLastMessageIdRequests) {
kv.second.promise->setFailed(result);
}
for (auto& kv : pendingGetNamespaceTopicsRequests) {
kv.second.setFailed(result);
}
}