lib/PartitionedProducerImpl.cc (403 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "PartitionedProducerImpl.h" #include <sstream> #include "ClientImpl.h" #include "ExecutorService.h" #include "LogUtils.h" #include "LookupService.h" #include "ProducerImpl.h" #include "RoundRobinMessageRouter.h" #include "SinglePartitionMessageRouter.h" #include "TopicMetadataImpl.h" #include "TopicName.h" DECLARE_LOG_OBJECT() namespace pulsar { const std::string PartitionedProducerImpl::PARTITION_NAME_SUFFIX = "-partition-"; PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const TopicNamePtr topicName, const unsigned int numPartitions, const ProducerConfiguration& config, const ProducerInterceptorsPtr& interceptors) : client_(client), topicName_(topicName), topic_(topicName_->toString()), conf_(config), topicMetadata_(new TopicMetadataImpl(numPartitions)), flushedPartitions_(0), interceptors_(interceptors) { routerPolicy_ = getMessageRouter(); int maxPendingMessagesPerPartition = std::min(config.getMaxPendingMessages(), (int)(config.getMaxPendingMessagesAcrossPartitions() / numPartitions)); conf_.setMaxPendingMessages(maxPendingMessagesPerPartition); auto partitionsUpdateInterval = static_cast<unsigned int>(client->conf().getPartitionsUpdateInterval()); if (partitionsUpdateInterval > 0) { listenerExecutor_ = client->getListenerExecutorProvider()->get(); partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer(); partitionsUpdateInterval_ = std::chrono::seconds(partitionsUpdateInterval); lookupServicePtr_ = client->getLookup(); } } MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() { switch (conf_.getPartitionsRoutingMode()) { case ProducerConfiguration::RoundRobinDistribution: return std::make_shared<RoundRobinMessageRouter>( conf_.getHashingScheme(), conf_.getBatchingEnabled(), conf_.getBatchingMaxMessages(), conf_.getBatchingMaxAllowedSizeInBytes(), std::chrono::milliseconds(conf_.getBatchingMaxPublishDelayMs())); case ProducerConfiguration::CustomPartition: return conf_.getMessageRouterPtr(); case ProducerConfiguration::UseSinglePartition: default: return std::make_shared<SinglePartitionMessageRouter>(getNumPartitions(), conf_.getHashingScheme()); } } PartitionedProducerImpl::~PartitionedProducerImpl() { shutdown(); } // override const std::string& PartitionedProducerImpl::getTopic() const { return topic_; } unsigned int PartitionedProducerImpl::getNumPartitions() const { return static_cast<unsigned int>(topicMetadata_->getNumPartitions()); } unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const { Lock lock(producersMutex_); return getNumPartitions(); } ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy, bool retryOnCreationError) { using namespace std::placeholders; auto client = client_.lock(); auto producer = std::make_shared<ProducerImpl>(client, *topicName_, conf_, interceptors_, partition, retryOnCreationError); if (!client) { return producer; } if (lazy) { createLazyPartitionProducer(partition); } else { producer->getProducerCreatedFuture().addListener( std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated, const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition)); } LOG_DEBUG("Creating Producer for single Partition - " << topicName_ << "-partition-" << partition); return producer; } // override void PartitionedProducerImpl::start() { // create producer per partition // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased // when `state_` is Ready if (conf_.getLazyStartPartitionedProducers() && conf_.getAccessMode() == ProducerConfiguration::Shared) { // start one producer now, to ensure authz errors occur now // if the SinglePartition router is used, then this producer will serve // all non-keyed messages in the future Message msg = MessageBuilder().setContent("x").build(); short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_)); for (unsigned int i = 0; i < getNumPartitions(); i++) { bool lazy = (short)i != partition; producers_.push_back(newInternalProducer(i, lazy, false)); } producers_[partition]->start(); } else { for (unsigned int i = 0; i < getNumPartitions(); i++) { producers_.push_back(newInternalProducer(i, false, false)); } for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { (*prod)->start(); } } } void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerWeakPtr, unsigned int partitionIndex) { // to indicate, we are doing cleanup using closeAsync after producer create // has failed and the invocation of closeAsync is not from client const auto numPartitions = getNumPartitionsWithLock(); assert(numProducersCreated_ <= numPartitions && partitionIndex <= numPartitions); if (state_ == Closing) { return; } if (state_ == Failed) { // We have already informed client that producer creation failed if (++numProducersCreated_ == numPartitions) { closeAsync(nullptr); } return; } if (result != ResultOk) { LOG_ERROR("Unable to create Producer for partition - " << partitionIndex << " Error - " << result); partitionedProducerCreatedPromise_.setFailed(result); state_ = Failed; if (++numProducersCreated_ == numPartitions) { closeAsync(nullptr); } return; } if (++numProducersCreated_ == numPartitions) { state_ = Ready; if (partitionsUpdateTimer_) { runPartitionUpdateTask(); } partitionedProducerCreatedPromise_.setValue(shared_from_this()); } } void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partitionIndex) { const auto numPartitions = getNumPartitions(); assert(numProducersCreated_ <= numPartitions); assert(partitionIndex <= numPartitions); numProducersCreated_++; if (numProducersCreated_ == numPartitions) { state_ = Ready; if (partitionsUpdateTimer_) { runPartitionUpdateTask(); } partitionedProducerCreatedPromise_.setValue(shared_from_this()); } } // override void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) { if (state_ != Ready) { if (callback) { callback(ResultAlreadyClosed, msg.getMessageId()); } return; } // get partition for this message from router policy Lock producersLock(producersMutex_); short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_)); if (partition >= getNumPartitions() || partition >= producers_.size()) { LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition); // change me: abort or notify failure in callback? // change to appropriate error if callback if (callback) { callback(ResultUnknownError, msg.getMessageId()); } return; } // find a producer for that partition, index should start from 0 ProducerImplPtr producer = producers_[partition]; // if the producer is not started (lazy producer), then kick-off the start process if (!producer->isStarted()) { producer->start(); } producersLock.unlock(); // send message on that partition if (!conf_.getLazyStartPartitionedProducers() || producer->ready()) { producer->sendAsync(msg, std::move(callback)); } else { // Wrapping the callback into a lambda has overhead, so we check if the producer is ready first producer->getProducerCreatedFuture().addListener( [msg, callback](Result result, ProducerImplBaseWeakPtr weakProducer) { if (result == ResultOk) { weakProducer.lock()->sendAsync(msg, std::move(callback)); } else if (callback) { callback(result, {}); } }); } } // override void PartitionedProducerImpl::shutdown() { cancelTimers(); interceptors_->close(); auto client = client_.lock(); if (client) { client->cleanupProducer(this); } partitionedProducerCreatedPromise_.setFailed(ResultAlreadyClosed); state_ = Closed; } const std::string& PartitionedProducerImpl::getProducerName() const { Lock producersLock(producersMutex_); return producers_[0]->getProducerName(); } const std::string& PartitionedProducerImpl::getSchemaVersion() const { Lock producersLock(producersMutex_); // Since the schema is atomically assigned on the partitioned-topic, // it's guaranteed that all the partitions will have the same schema version. return producers_[0]->getSchemaVersion(); } int64_t PartitionedProducerImpl::getLastSequenceId() const { int64_t currentMax = -1L; Lock producersLock(producersMutex_); for (int i = 0; i < producers_.size(); i++) { currentMax = std::max(currentMax, producers_[i]->getLastSequenceId()); } return currentMax; } /* * if createProducerCallback is set, it means the closeAsync is called from CreateProducer API which failed to * create one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure */ void PartitionedProducerImpl::closeAsync(CloseCallback originalCallback) { auto closeCallback = [this, originalCallback](Result result) { if (result == ResultOk) { shutdown(); } if (originalCallback) { originalCallback(result); } }; if (state_ == Closed || state_.exchange(Closing) == Closing) { closeCallback(ResultAlreadyClosed); return; } cancelTimers(); unsigned int producerAlreadyClosed = 0; // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased // when `state_` is Ready for (auto& producer : producers_) { if (!producer->isClosed()) { auto self = shared_from_this(); const auto partition = static_cast<unsigned int>(producer->partition()); producer->closeAsync([this, self, partition, closeCallback](Result result) { handleSinglePartitionProducerClose(result, partition, closeCallback); }); } else { producerAlreadyClosed++; } } const auto numProducers = producers_.size(); /* * No need to set state since:- * a. If closeAsync before creation then state == Closed, since producers_.size() = producerAlreadyClosed * = 0 * b. If closeAsync called after all sub partitioned producer connected - * handleSinglePartitionProducerClose handles the closing * c. If closeAsync called due to failure in creating just one sub producer then state is set by * handleSinglePartitionProducerCreated */ if (producerAlreadyClosed == numProducers) { closeCallback(ResultOk); } } // `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()` void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex, CloseCallback callback) { if (state_ == Failed) { // we should have already notified the client by callback return; } if (result != ResultOk) { LOG_ERROR("Closing the producer failed for partition - " << partitionIndex); callback(result); state_ = Failed; return; } assert(partitionIndex < getNumPartitionsWithLock()); if (numProducersCreated_ > 0) { numProducersCreated_--; } // closed all successfully if (!numProducersCreated_) { // set the producerCreatedPromise to failure, if client called // closeAsync and it's not failure to create producer, the promise // is set second time here, first time it was successful. So check // if there's any adverse effect of setting it again. It should not // be but must check. MUSTCHECK changeme partitionedProducerCreatedPromise_.setFailed(ResultUnknownError); callback(result); return; } } // override Future<Result, ProducerImplBaseWeakPtr> PartitionedProducerImpl::getProducerCreatedFuture() { return partitionedProducerCreatedPromise_.getFuture(); } // override bool PartitionedProducerImpl::isClosed() { return state_ == Closed; } void PartitionedProducerImpl::triggerFlush() { Lock producersLock(producersMutex_); for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { if ((*prod)->isStarted()) { (*prod)->triggerFlush(); } } } void PartitionedProducerImpl::flushAsync(FlushCallback callback) { if (!flushPromise_ || flushPromise_->isComplete()) { flushPromise_ = std::make_shared<Promise<Result, bool>>(); } else { // already in flushing, register a listener callback auto listenerCallback = [callback](Result result, bool v) { if (v) { callback(ResultOk); } else { callback(ResultUnknownError); } return; }; flushPromise_->getFuture().addListener(listenerCallback); return; } Lock producersLock(producersMutex_); const int numProducers = static_cast<int>(producers_.size()); FlushCallback subFlushCallback = [this, callback, numProducers](Result result) { // We shouldn't lock `producersMutex_` here because `subFlushCallback` may be called in // `ProducerImpl::flushAsync`, and then deadlock occurs. int previous = flushedPartitions_.fetch_add(1); if (previous == numProducers - 1) { flushedPartitions_.store(0); flushPromise_->setValue(true); callback(result); } return; }; for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { if ((*prod)->isStarted()) { (*prod)->flushAsync(subFlushCallback); } else { subFlushCallback(ResultOk); } } } void PartitionedProducerImpl::runPartitionUpdateTask() { auto weakSelf = weak_from_this(); partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_); partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) { auto self = weakSelf.lock(); if (self) { self->getPartitionMetadata(); } }); } void PartitionedProducerImpl::getPartitionMetadata() { using namespace std::placeholders; auto weakSelf = weak_from_this(); lookupServicePtr_->getPartitionMetadataAsync(topicName_) .addListener([weakSelf](Result result, const LookupDataResultPtr& lookupDataResult) { auto self = weakSelf.lock(); if (self) { self->handleGetPartitions(result, lookupDataResult); } }); } void PartitionedProducerImpl::handleGetPartitions(Result result, const LookupDataResultPtr& lookupDataResult) { if (state_ != Ready) { return; } if (!result) { const auto newNumPartitions = static_cast<unsigned int>(lookupDataResult->getPartitions()); Lock producersLock(producersMutex_); const auto currentNumPartitions = getNumPartitions(); assert(currentNumPartitions == producers_.size()); if (newNumPartitions > currentNumPartitions) { LOG_INFO("new partition count: " << newNumPartitions); topicMetadata_.reset(new TopicMetadataImpl(newNumPartitions)); std::vector<ProducerImplPtr> producers; auto lazy = conf_.getLazyStartPartitionedProducers() && conf_.getAccessMode() == ProducerConfiguration::Shared; for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) { ProducerImplPtr producer; try { producer = newInternalProducer(i, lazy, true); } catch (const std::runtime_error& e) { LOG_ERROR("Failed to create producer for partition " << i << ": " << e.what()); producers.clear(); break; } producers.emplace_back(producer); } if (producers.empty()) { runPartitionUpdateTask(); return; } for (unsigned int i = 0; i < producers.size(); i++) { auto&& producer = producers[i]; producers_.emplace_back(producer); if (!lazy) { producer->start(); } } producersLock.unlock(); interceptors_->onPartitionsChange(getTopic(), newNumPartitions); // `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()` return; } } else { LOG_WARN("Failed to getPartitionMetadata: " << strResult(result)); } runPartitionUpdateTask(); } bool PartitionedProducerImpl::isConnected() const { if (state_ != Ready) { return false; } Lock producersLock(producersMutex_); const auto producers = producers_; producersLock.unlock(); for (const auto& producer : producers) { if (producer->isStarted() && !producer->isConnected()) { return false; } } return true; } uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() { uint64_t numberOfConnectedProducer = 0; Lock producersLock(producersMutex_); const auto producers = producers_; producersLock.unlock(); for (const auto& producer : producers) { if (producer->isConnected()) { numberOfConnectedProducer++; } } return numberOfConnectedProducer; } void PartitionedProducerImpl::cancelTimers() noexcept { if (partitionsUpdateTimer_) { ASIO_ERROR ec; partitionsUpdateTimer_->cancel(ec); } } } // namespace pulsar