lib/PartitionedProducerImpl.h (92 lines of code) (raw):
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
#include <pulsar/MessageRoutingPolicy.h>
#include <pulsar/TopicMetadata.h>
#include <atomic>
#include <boost/asio/deadline_timer.hpp>
#include <memory>
#include <mutex>
#include <vector>
#include "LookupDataResult.h"
#include "ProducerImplBase.h"
#include "ProducerInterceptors.h"
namespace pulsar {
class ClientImpl;
using ClientImplPtr = std::shared_ptr<ClientImpl>;
using ClientImplWeakPtr = std::weak_ptr<ClientImpl>;
using DeadlineTimerPtr = std::shared_ptr<boost::asio::deadline_timer>;
class ExecutorService;
using ExecutorServicePtr = std::shared_ptr<ExecutorService>;
class LookupService;
using LookupServicePtr = std::shared_ptr<LookupService>;
class ProducerImpl;
using ProducerImplPtr = std::shared_ptr<ProducerImpl>;
class TopicName;
using TopicNamePtr = std::shared_ptr<TopicName>;
class PartitionedProducerImpl : public ProducerImplBase,
                                public std::enable_shared_from_this<PartitionedProducerImpl> {
   public:
    enum State
    {
        Pending,
        Ready,
        Closing,
        Closed,
        Failed
    };
    const static std::string PARTITION_NAME_SUFFIX;
    typedef std::unique_lock<std::mutex> Lock;
    PartitionedProducerImpl(ClientImplPtr ptr, const TopicNamePtr topicName, const unsigned int numPartitions,
                            const ProducerConfiguration& config, const ProducerInterceptorsPtr& interceptors);
    virtual ~PartitionedProducerImpl();
    // overrided methods from ProducerImplBase
    const std::string& getProducerName() const override;
    int64_t getLastSequenceId() const override;
    const std::string& getSchemaVersion() const override;
    void sendAsync(const Message& msg, SendCallback callback) override;
    /*
     * closes all active producers, it can be called explicitly from client as well as createProducer
     * when it fails to create one of the producers and we want to fail createProducer
     */
    void closeAsync(CloseCallback callback) override;
    void start() override;
    void shutdown() override;
    bool isClosed() override;
    const std::string& getTopic() const override;
    Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() override;
    void triggerFlush() override;
    void flushAsync(FlushCallback callback) override;
    bool isConnected() const override;
    uint64_t getNumberOfConnectedProducer() override;
    void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
                                              const unsigned int partitionIndex);
    void createLazyPartitionProducer(const unsigned int partitionIndex);
    void handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex,
                                            CloseCallback callback);
    void notifyResult(CloseCallback closeCallback);
    std::weak_ptr<PartitionedProducerImpl> weak_from_this() noexcept { return shared_from_this(); }
    friend class PulsarFriend;
   private:
    ClientImplWeakPtr client_;
    const TopicNamePtr topicName_;
    const std::string topic_;
    std::atomic_uint numProducersCreated_{0};
    /*
     * set when one or more Single Partition Creation fails, close will cleanup and fail the create callbackxo
     */
    bool cleanup_ = false;
    ProducerConfiguration conf_;
    typedef std::vector<ProducerImplPtr> ProducerList;
    ProducerList producers_;
    // producersMutex_ is used to share producers_ and topicMetadata_
    mutable std::mutex producersMutex_;
    MessageRoutingPolicyPtr routerPolicy_;
    std::atomic<State> state_{Pending};
    // only set this promise to value, when producers on all partitions are created.
    Promise<Result, ProducerImplBaseWeakPtr> partitionedProducerCreatedPromise_;
    std::unique_ptr<TopicMetadata> topicMetadata_;
    std::atomic<int> flushedPartitions_;
    std::shared_ptr<Promise<Result, bool>> flushPromise_;
    ExecutorServicePtr listenerExecutor_;
    DeadlineTimerPtr partitionsUpdateTimer_;
    boost::posix_time::time_duration partitionsUpdateInterval_;
    LookupServicePtr lookupServicePtr_;
    ProducerInterceptorsPtr interceptors_;
    unsigned int getNumPartitions() const;
    unsigned int getNumPartitionsWithLock() const;
    ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy);
    MessageRoutingPolicyPtr getMessageRouter();
    void runPartitionUpdateTask();
    void getPartitionMetadata();
    void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
    void cancelTimers() noexcept;
};
}  // namespace pulsar