lib/ProducerImpl.h (143 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef LIB_PRODUCERIMPL_H_ #define LIB_PRODUCERIMPL_H_ #include "TimeUtils.h" #ifdef USE_ASIO #include <asio/steady_timer.hpp> #else #include <boost/asio/steady_timer.hpp> #endif #include <atomic> #include <boost/optional.hpp> #include <list> #include <memory> #include "Future.h" #include "HandlerBase.h" // In MSVC and macOS, the value type of STL container cannot be forward declared #if defined(_MSC_VER) || defined(__APPLE__) #include "OpSendMsg.h" #endif #include "AsioDefines.h" #include "PendingFailures.h" #include "PeriodicTask.h" #include "ProducerImplBase.h" namespace pulsar { class BatchMessageContainerBase; class ClientImpl; using ClientImplPtr = std::shared_ptr<ClientImpl>; using DeadlineTimerPtr = std::shared_ptr<ASIO::steady_timer>; class MessageCrypto; using MessageCryptoPtr = std::shared_ptr<MessageCrypto>; class ProducerImpl; using ProducerImplWeakPtr = std::weak_ptr<ProducerImpl>; class ProducerStatsBase; using ProducerStatsBasePtr = std::shared_ptr<ProducerStatsBase>; struct ResponseData; class ProducerImpl; using ProducerImplPtr = std::shared_ptr<ProducerImpl>; class PulsarFriend; class Producer; class MemoryLimitController; class Semaphore; class TopicName; struct OpSendMsg; namespace proto { class MessageMetadata; } // namespace proto class ProducerImpl : public HandlerBase, public ProducerImplBase { public: ProducerImpl(ClientImplPtr client, const TopicName& topic, const ProducerConfiguration& producerConfiguration, const ProducerInterceptorsPtr& interceptors, int32_t partition = -1, bool retryOnCreationError = false); ~ProducerImpl(); // overrided methods from ProducerImplBase const std::string& getProducerName() const override; int64_t getLastSequenceId() const override; const std::string& getSchemaVersion() const override; void sendAsync(const Message& msg, SendCallback callback) override; void closeAsync(CloseCallback callback) override; void start() override; void shutdown() override; bool isClosed() override; const std::string& getTopic() const override; Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() override; void triggerFlush() override; void flushAsync(FlushCallback callback) override; bool isConnected() const override; uint64_t getNumberOfConnectedProducer() override; bool isStarted() const; bool removeCorruptMessage(uint64_t sequenceId); bool ackReceived(uint64_t sequenceId, MessageId& messageId); virtual void disconnectProducer(const boost::optional<std::string>& assignedBrokerUrl); virtual void disconnectProducer(); uint64_t getProducerId() const; int32_t partition() const noexcept { return partition_; } static int getNumOfChunks(uint32_t size, uint32_t maxMessageSize); ProducerImplPtr shared_from_this() noexcept { return std::dynamic_pointer_cast<ProducerImpl>(HandlerBase::shared_from_this()); } ProducerImplWeakPtr weak_from_this() noexcept { return shared_from_this(); } bool ready() const { return producerCreatedPromise_.isComplete(); } protected: ProducerStatsBasePtr producerStatsBasePtr_; void setMessageMetadata(const Message& msg, const uint64_t& sequenceId, const uint32_t& uncompressedSize); void sendMessage(std::unique_ptr<OpSendMsg> opSendMsg); void startSendTimeoutTimer(); friend class PulsarFriend; friend class Producer; friend class BatchMessageContainerBase; friend class BatchMessageContainer; // overrided methods from HandlerBase void beforeConnectionChange(ClientConnection& connection) override; Future<Result, bool> connectionOpened(const ClientConnectionPtr& connection) override; void connectionFailed(Result result) override; const std::string& getName() const override { return producerStr_; } private: void printStats(); Result handleCreateProducer(const ClientConnectionPtr& cnx, Result result, const ResponseData& responseData); void resendMessages(ClientConnectionPtr cnx); void refreshEncryptionKey(const ASIO_ERROR& ec); bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload, SharedBuffer& encryptedPayload); void sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& callback); /** * Reserve a spot in the messages queue before acquiring the ProducerImpl mutex. When the queue is full, * this call will block until a spot is available if blockIfQueueIsFull is true. Otherwise, it will return * ResultProducerQueueIsFull immediately. * * It also checks whether the memory could reach the limit after `payloadSize` is added. If so, this call * will block until enough memory could be retained. */ Result canEnqueueRequest(uint32_t payloadSize); void releaseSemaphore(uint32_t payloadSize); void releaseSemaphoreForSendOp(const OpSendMsg& op); void cancelTimers() noexcept; bool isValidProducerState(const SendCallback& callback) const; bool canAddToBatch(const Message& msg) const; typedef std::unique_lock<std::mutex> Lock; ProducerConfiguration conf_; std::unique_ptr<Semaphore> semaphore_; std::list<std::unique_ptr<OpSendMsg>> pendingMessagesQueue_; const int32_t partition_; // -1 if topic is non-partitioned std::string producerName_; bool userProvidedProducerName_; std::string producerStr_; uint64_t producerId_; std::unique_ptr<BatchMessageContainerBase> batchMessageContainer_; DeadlineTimerPtr batchTimer_; PendingFailures batchMessageAndSend(const FlushCallback& flushCallback = nullptr); std::atomic<int64_t> lastSequenceIdPublished_; std::atomic<int64_t> msgSequenceGenerator_; std::string schemaVersion_; DeadlineTimerPtr sendTimer_; void handleSendTimeout(const ASIO_ERROR& err); using DurationType = TimeDuration; void asyncWaitSendTimeout(DurationType expiryTime); Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_; struct PendingCallbacks; decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailed(); decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailedWithLock(); void failPendingMessages(Result result, bool withLock); MessageCryptoPtr msgCrypto_; PeriodicTask dataKeyRefreshTask_; MemoryLimitController& memoryLimitController_; const bool chunkingEnabled_; boost::optional<uint64_t> topicEpoch; ProducerInterceptorsPtr interceptors_; bool retryOnCreationError_; }; struct ProducerImplCmp { bool operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const; }; } /* namespace pulsar */ #endif /* LIB_PRODUCERIMPL_H_ */