void ClientImpl::closeAsync()

in lib/ClientImpl.cc [590:635]


void ClientImpl::closeAsync(CloseCallback callback) {
    if (state_ != Open) {
        if (callback) {
            callback(ResultAlreadyClosed);
        }
        return;
    }
    // Set the state to Closing so that no producers could get added
    state_ = Closing;

    memoryLimitController_.close();
    lookupServicePtr_->close();

    auto producers = producers_.move();
    auto consumers = consumers_.move();

    SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + consumers.size());
    LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size()
                                           << " consumers");

    for (auto&& kv : producers) {
        ProducerImplBasePtr producer = kv.second.lock();
        if (producer && !producer->isClosed()) {
            producer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
                                           std::placeholders::_1, numberOfOpenHandlers, callback));
        } else {
            // Since the connection is already closed
            (*numberOfOpenHandlers)--;
        }
    }

    for (auto&& kv : consumers) {
        ConsumerImplBasePtr consumer = kv.second.lock();
        if (consumer && !consumer->isClosed()) {
            consumer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
                                           std::placeholders::_1, numberOfOpenHandlers, callback));
        } else {
            // Since the connection is already closed
            (*numberOfOpenHandlers)--;
        }
    }

    if (*numberOfOpenHandlers == 0 && callback) {
        handleClose(ResultOk, numberOfOpenHandlers, callback);
    }
}