in lib/ClientConnection.cc [1278:1381]
void ClientConnection::close(Result result, bool detach) {
Lock lock(mutex_);
if (isClosed()) {
return;
}
state_ = Disconnected;
if (socket_) {
ASIO_ERROR err;
socket_->shutdown(ASIO::socket_base::shutdown_both, err);
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
}
if (tlsSocket_) {
ASIO_ERROR err;
tlsSocket_->lowest_layer().close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message());
}
}
if (executor_) {
executor_.reset();
}
// Move the internal fields to process them after `mutex_` was unlocked
auto consumers = std::move(consumers_);
auto producers = std::move(producers_);
auto pendingRequests = std::move(pendingRequests_);
auto pendingLookupRequests = std::move(pendingLookupRequests_);
auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_);
auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_);
auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_);
auto pendingGetSchemaRequests = std::move(pendingGetSchemaRequests_);
numOfPendingLookupRequest_ = 0;
if (keepAliveTimer_) {
keepAliveTimer_->cancel();
keepAliveTimer_.reset();
}
if (consumerStatsRequestTimer_) {
consumerStatsRequestTimer_->cancel();
consumerStatsRequestTimer_.reset();
}
if (connectTimeoutTask_) {
connectTimeoutTask_->stop();
}
lock.unlock();
int refCount = weak_from_this().use_count();
if (!isResultRetryable(result)) {
LOG_ERROR(cnxString_ << "Connection closed with " << result << " (refCnt: " << refCount << ")");
} else {
LOG_INFO(cnxString_ << "Connection disconnected (refCnt: " << refCount << ")");
}
// Remove the connection from the pool before completing any promise
if (detach) {
pool_.remove(logicalAddress_, physicalAddress_, poolIndex_, this);
}
auto self = shared_from_this();
for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
auto producer = it->second.lock();
if (producer) {
producer->handleDisconnection(result, self);
}
}
for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) {
auto consumer = it->second.lock();
if (consumer) {
consumer->handleDisconnection(result, self);
}
}
self.reset();
connectPromise_.setFailed(result);
// Fail all pending requests, all these type are map whose value type contains the Promise object
for (auto& kv : pendingRequests) {
kv.second.promise.setFailed(result);
}
for (auto& kv : pendingLookupRequests) {
kv.second.promise->setFailed(result);
}
for (auto& kv : pendingConsumerStatsMap) {
LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later");
kv.second.setFailed(result);
}
for (auto& kv : pendingGetLastMessageIdRequests) {
kv.second.promise->setFailed(result);
}
for (auto& kv : pendingGetNamespaceTopicsRequests) {
kv.second.setFailed(result);
}
for (auto& kv : pendingGetSchemaRequests) {
kv.second.promise.setFailed(result);
}
}