in lib/PartitionedProducerImpl.cc [445:494]
void PartitionedProducerImpl::handleGetPartitions(Result result,
const LookupDataResultPtr& lookupDataResult) {
if (state_ != Ready) {
return;
}
if (!result) {
const auto newNumPartitions = static_cast<unsigned int>(lookupDataResult->getPartitions());
Lock producersLock(producersMutex_);
const auto currentNumPartitions = getNumPartitions();
assert(currentNumPartitions == producers_.size());
if (newNumPartitions > currentNumPartitions) {
LOG_INFO("new partition count: " << newNumPartitions);
topicMetadata_.reset(new TopicMetadataImpl(newNumPartitions));
std::vector<ProducerImplPtr> producers;
auto lazy = conf_.getLazyStartPartitionedProducers() &&
conf_.getAccessMode() == ProducerConfiguration::Shared;
for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
ProducerImplPtr producer;
try {
producer = newInternalProducer(i, lazy, true);
} catch (const std::runtime_error& e) {
LOG_ERROR("Failed to create producer for partition " << i << ": " << e.what());
producers.clear();
break;
}
producers.emplace_back(producer);
}
if (producers.empty()) {
runPartitionUpdateTask();
return;
}
for (unsigned int i = 0; i < producers.size(); i++) {
auto&& producer = producers[i];
producers_.emplace_back(producer);
if (!lazy) {
producer->start();
}
}
producersLock.unlock();
interceptors_->onPartitionsChange(getTopic(), newNumPartitions);
// `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
return;
}
} else {
LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
}
runPartitionUpdateTask();
}