void PartitionedProducerImpl::handleGetPartitions()

in lib/PartitionedProducerImpl.cc [445:494]


void PartitionedProducerImpl::handleGetPartitions(Result result,
                                                  const LookupDataResultPtr& lookupDataResult) {
    if (state_ != Ready) {
        return;
    }

    if (!result) {
        const auto newNumPartitions = static_cast<unsigned int>(lookupDataResult->getPartitions());
        Lock producersLock(producersMutex_);
        const auto currentNumPartitions = getNumPartitions();
        assert(currentNumPartitions == producers_.size());
        if (newNumPartitions > currentNumPartitions) {
            LOG_INFO("new partition count: " << newNumPartitions);
            topicMetadata_.reset(new TopicMetadataImpl(newNumPartitions));

            std::vector<ProducerImplPtr> producers;
            auto lazy = conf_.getLazyStartPartitionedProducers() &&
                        conf_.getAccessMode() == ProducerConfiguration::Shared;
            for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
                ProducerImplPtr producer;
                try {
                    producer = newInternalProducer(i, lazy, true);
                } catch (const std::runtime_error& e) {
                    LOG_ERROR("Failed to create producer for partition " << i << ": " << e.what());
                    producers.clear();
                    break;
                }
                producers.emplace_back(producer);
            }
            if (producers.empty()) {
                runPartitionUpdateTask();
                return;
            }
            for (unsigned int i = 0; i < producers.size(); i++) {
                auto&& producer = producers[i];
                producers_.emplace_back(producer);
                if (!lazy) {
                    producer->start();
                }
            }
            producersLock.unlock();
            interceptors_->onPartitionsChange(getTopic(), newNumPartitions);
            // `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
            return;
        }
    } else {
        LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
    }
    runPartitionUpdateTask();
}