void MultiTopicsConsumerImpl::subscribeTopicPartitions()

in lib/MultiTopicsConsumerImpl.cc [199:281]


void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName,
                                                       const std::string& consumerName,
                                                       ConsumerSubResultPromisePtr topicSubResultPromise) {
    std::shared_ptr<ConsumerImpl> consumer;
    ConsumerConfiguration config = conf_.clone();
    auto client = client_.lock();
    if (!client) {
        topicSubResultPromise->setFailed(ResultAlreadyClosed);
        return;
    }
    ExecutorServicePtr internalListenerExecutor = client->getPartitionListenerExecutorProvider()->get();

    auto weakSelf = weak_from_this();
    config.setMessageListener([this, weakSelf](Consumer consumer, const Message& msg) {
        auto self = weakSelf.lock();
        if (self) {
            messageReceived(consumer, msg);
        }
    });

    int partitions = numPartitions == 0 ? 1 : numPartitions;

    // Apply total limit of receiver queue size across partitions
    config.setReceiverQueueSize(
        std::min(conf_.getReceiverQueueSize(),
                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / partitions)));

    Lock lock(mutex_);
    topicsPartitions_[topicName->toString()] = partitions;
    lock.unlock();
    numberTopicPartitions_->fetch_add(partitions);

    std::shared_ptr<std::atomic<int>> partitionsNeedCreate = std::make_shared<std::atomic<int>>(partitions);

    // non-partitioned topic
    if (numPartitions == 0) {
        // We don't have to add partition-n suffix
        try {
            consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_,
                                                      config, topicName->isPersistent(), interceptors_,
                                                      internalListenerExecutor, true, NonPartitioned,
                                                      subscriptionMode_, startMessageId_);
        } catch (const std::runtime_error& e) {
            LOG_ERROR("Failed to create ConsumerImpl for " << topicName->toString() << ": " << e.what());
            topicSubResultPromise->setFailed(ResultConnectError);
            return;
        }
        consumer->getConsumerCreatedFuture().addListener(std::bind(
            &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
            std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
        consumers_.emplace(topicName->toString(), consumer);
        LOG_DEBUG("Creating Consumer for - " << topicName << " - " << consumerStr_);
        consumer->start();

    } else {
        std::vector<ConsumerImplPtr> consumers;
        for (int i = 0; i < numPartitions; i++) {
            std::string topicPartitionName = topicName->getTopicPartitionName(i);
            try {
                consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_,
                                                          config, topicName->isPersistent(), interceptors_,
                                                          internalListenerExecutor, true, Partitioned,
                                                          subscriptionMode_, startMessageId_);
            } catch (const std::runtime_error& e) {
                LOG_ERROR("Failed to create ConsumerImpl for " << topicPartitionName << ": " << e.what());
                topicSubResultPromise->setFailed(ResultConnectError);
                return;
            }
            consumers.emplace_back(consumer);
        }
        for (size_t i = 0; i < consumers.size(); i++) {
            std::string topicPartitionName = topicName->getTopicPartitionName(i);
            auto&& consumer = consumers[i];
            consumer->getConsumerCreatedFuture().addListener(std::bind(
                &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
                std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
            consumer->setPartitionIndex(i);
            consumers_.emplace(topicPartitionName, consumer);
            LOG_DEBUG("Creating Consumer for - " << topicPartitionName << " - " << consumerStr_);
            consumer->start();
        }
    }
}