void MultiTopicsConsumerImpl::closeAsync()

in lib/MultiTopicsConsumerImpl.cc [455:516]


void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
    std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
    auto callback = [weakSelf, originalCallback](Result result) {
        auto self = weakSelf.lock();
        if (self) {
            self->shutdown();
            if (result != ResultOk) {
                LOG_WARN(self->getName() << "Failed to close consumer: " << result);
                if (result != ResultAlreadyClosed) {
                    self->state_ = Failed;
                }
            }
        }
        if (originalCallback) {
            originalCallback(result);
        }
    };
    const auto state = state_.load();
    if (state == Closing || state == Closed) {
        callback(ResultAlreadyClosed);
        return;
    }

    state_ = Closing;

    cancelTimers();

    auto consumers = consumers_.move();
    *numberTopicPartitions_ = 0;
    if (consumers.empty()) {
        LOG_DEBUG("TopicsConsumer have no consumers to close "
                  << " topic" << topic_ << " subscription - " << subscriptionName_);
        callback(ResultAlreadyClosed);
        return;
    }

    auto numConsumers = std::make_shared<std::atomic<size_t>>(consumers.size());
    for (auto&& kv : consumers) {
        auto& name = kv.first;
        auto& consumer = kv.second;
        consumer->closeAsync([name, numConsumers, callback](Result result) {
            const auto numConsumersLeft = --*numConsumers;
            LOG_DEBUG("Closing the consumer for partition - " << name << " numConsumersLeft - "
                                                              << numConsumersLeft);

            if (result != ResultOk) {
                LOG_ERROR("Closing the consumer failed for partition - " << name << " with error - "
                                                                         << result);
            }
            if (numConsumersLeft == 0) {
                callback(result);
            }
        });
    }

    // fail pending receive
    failPendingReceiveCallback();
    failPendingBatchReceiveCallback();

    // cancel timer
    batchReceiveTimer_->cancel();
}