in lib/MultiTopicsConsumerImpl.cc [451:512]
void MultiTopicsConsumerImpl::closeAsync(ResultCallback originalCallback) {
std::weak_ptr<MultiTopicsConsumerImpl> weakSelf{get_shared_this_ptr()};
auto callback = [weakSelf, originalCallback](Result result) {
auto self = weakSelf.lock();
if (self) {
self->shutdown();
if (result != ResultOk) {
LOG_WARN(self->getName() << "Failed to close consumer: " << result);
if (result != ResultAlreadyClosed) {
self->state_ = Failed;
}
}
}
if (originalCallback) {
originalCallback(result);
}
};
const auto state = state_.load();
if (state == Closing || state == Closed) {
callback(ResultOk);
return;
}
state_ = Closing;
cancelTimers();
auto consumers = consumers_.move();
*numberTopicPartitions_ = 0;
if (consumers.empty()) {
LOG_DEBUG("TopicsConsumer have no consumers to close "
<< " topic" << topic() << " subscription - " << subscriptionName_);
callback(ResultOk);
return;
}
auto numConsumers = std::make_shared<std::atomic<size_t>>(consumers.size());
for (auto&& kv : consumers) {
auto& name = kv.first;
auto& consumer = kv.second;
consumer->closeAsync([name, numConsumers, callback](Result result) {
const auto numConsumersLeft = --*numConsumers;
LOG_DEBUG("Closing the consumer for partition - " << name << " numConsumersLeft - "
<< numConsumersLeft);
if (result != ResultOk) {
LOG_ERROR("Closing the consumer failed for partition - " << name << " with error - "
<< result);
}
if (numConsumersLeft == 0) {
callback(result);
}
});
}
// fail pending receive
failPendingReceiveCallback();
failPendingBatchReceiveCallback();
// cancel timer
batchReceiveTimer_->cancel();
}