in lib/ClientImpl.cc [590:635]
void ClientImpl::closeAsync(CloseCallback callback) {
if (state_ != Open) {
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
// Set the state to Closing so that no producers could get added
state_ = Closing;
memoryLimitController_.close();
lookupServicePtr_->close();
auto producers = producers_.move();
auto consumers = consumers_.move();
SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + consumers.size());
LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size()
<< " consumers");
for (auto&& kv : producers) {
ProducerImplBasePtr producer = kv.second.lock();
if (producer && !producer->isClosed()) {
producer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
std::placeholders::_1, numberOfOpenHandlers, callback));
} else {
// Since the connection is already closed
(*numberOfOpenHandlers)--;
}
}
for (auto&& kv : consumers) {
ConsumerImplBasePtr consumer = kv.second.lock();
if (consumer && !consumer->isClosed()) {
consumer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
std::placeholders::_1, numberOfOpenHandlers, callback));
} else {
// Since the connection is already closed
(*numberOfOpenHandlers)--;
}
}
if (*numberOfOpenHandlers == 0 && callback) {
handleClose(ResultOk, numberOfOpenHandlers, callback);
}
}