in lib/MultiTopicsConsumerImpl.cc [199:281]
void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName,
const std::string& consumerName,
ConsumerSubResultPromisePtr topicSubResultPromise) {
std::shared_ptr<ConsumerImpl> consumer;
ConsumerConfiguration config = conf_.clone();
auto client = client_.lock();
if (!client) {
topicSubResultPromise->setFailed(ResultAlreadyClosed);
return;
}
ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();
auto weakSelf = weak_from_this();
config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
auto self = weakSelf.lock();
if (self) {
messageReceived(consumer, msg);
}
});
int partitions = numPartitions == 0 ? 1 : numPartitions;
// Apply total limit of receiver queue size across partitions
config.setReceiverQueueSize(
std::min(conf_.getReceiverQueueSize(),
(int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / partitions)));
Lock lock(mutex_);
topicsPartitions_[topicName->toString()] = partitions;
lock.unlock();
numberTopicPartitions_->fetch_add(partitions);
std::shared_ptr<std::atomic<int>> partitionsNeedCreate = std::make_shared<std::atomic<int>>(partitions);
// non-partitioned topic
if (numPartitions == 0) {
// We don't have to add partition-n suffix
try {
consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_,
config, topicName->isPersistent(), interceptors_,
internalListenerExecutor, true, NonPartitioned,
subscriptionMode_, startMessageId_);
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create ConsumerImpl for " << topicName->toString() << ": " << e.what());
topicSubResultPromise->setFailed(ResultConnectError);
return;
}
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
consumers_.emplace(topicName->toString(), consumer);
LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
consumer->start();
} else {
std::vector<ConsumerImplPtr> consumers;
for (int i = 0; i < numPartitions; i++) {
std::string topicPartitionName = topicName->getTopicPartitionName(i);
try {
consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_,
config, topicName->isPersistent(), interceptors_,
internalListenerExecutor, true, Partitioned,
subscriptionMode_, startMessageId_);
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create ConsumerImpl for " << topicPartitionName << ": " << e.what());
topicSubResultPromise->setFailed(ResultConnectError);
return;
}
consumers.emplace_back(consumer);
}
for (size_t i = 0; i < consumers.size(); i++) {
std::string topicPartitionName = topicName->getTopicPartitionName(i);
auto&& consumer = consumers[i];
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
consumer->setPartitionIndex(i);
consumers_.emplace(topicPartitionName, consumer);
LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - " << consumerStr_);
consumer->start();
}
}
}