include/pulsar/ProducerConfiguration.h (109 lines of code) (raw):

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #ifndef PULSAR_PRODUCERCONFIGURATION_H_ #define PULSAR_PRODUCERCONFIGURATION_H_ #include <pulsar/CompressionType.h> #include <pulsar/CryptoKeyReader.h> #include <pulsar/Message.h> #include <pulsar/MessageRoutingPolicy.h> #include <pulsar/ProducerCryptoFailureAction.h> #include <pulsar/ProducerInterceptor.h> #include <pulsar/Result.h> #include <pulsar/Schema.h> #include <pulsar/defines.h> #include <functional> #include <set> namespace pulsar { typedef std::function<void(Result, const MessageId& messageId)> SendCallback; typedef std::function<void(Result)> CloseCallback; struct ProducerConfigurationImpl; class PulsarWrapper; /** * Class that holds the configuration for a producer */ class PULSAR_PUBLIC ProducerConfiguration { public: enum PartitionsRoutingMode { UseSinglePartition, RoundRobinDistribution, CustomPartition }; enum HashingScheme { Murmur3_32Hash, BoostHash, JavaStringHash }; enum BatchingType { /** * Default batching. * * <p>incoming single messages: * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) * * <p>batched into single batch message: * [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)] */ DefaultBatching, /** * Key based batching. * * <p>incoming single messages: * (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3) * * <p>batched into single batch message: * [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)] */ KeyBasedBatching }; enum ProducerAccessMode { /** * By default multiple producers can publish on a topic. */ Shared = 0, /** * Require exclusive access for producer. Fail immediately if there's already a producer connected. */ Exclusive = 1, /** * Producer creation is pending until it can acquire exclusive access. */ WaitForExclusive = 2, /** * Acquire exclusive access for the producer. Any existing producer will be removed and * invalidated immediately. */ ExclusiveWithFencing = 3 }; ProducerConfiguration(); ~ProducerConfiguration(); ProducerConfiguration(const ProducerConfiguration&); ProducerConfiguration& operator=(const ProducerConfiguration&); /** * Set the producer name which could be assigned by the system or specified by the client. * * @param producerName producer name. * @return */ ProducerConfiguration& setProducerName(const std::string& producerName); /** * The getter associated with setProducerName(). */ const std::string& getProducerName() const; /** * Declare the schema of the data that will be published by this producer. * * The schema will be checked against the schema of the topic, and it * will fail if it's not compatible, though the client library will * not perform any validation that the actual message payload are * conforming to the specified schema. * * For all purposes, this * @param schemaInfo * @return */ ProducerConfiguration& setSchema(const SchemaInfo& schemaInfo); /** * @return the schema information declared for this producer */ const SchemaInfo& getSchema() const; /** * The getter associated with getSendTimeout() */ ProducerConfiguration& setSendTimeout(int sendTimeoutMs); /** * Get the send timeout is milliseconds. * * If a message is not acknowledged by the server before the sendTimeout expires, an error will be * reported. * * If the timeout is zero, there will be no timeout. * * @return the send timeout in milliseconds (Default: 30000) */ int getSendTimeout() const; /** * Set the baseline of the sequence ID for messages published by the producer. * <p> * The first message uses (initialSequenceId + 1) as its sequence ID and subsequent messages are assigned * incremental sequence IDs. * * Default: -1, which means the first message's sequence ID is 0. * * @param initialSequenceId the initial sequence ID for the producer. * @return */ ProducerConfiguration& setInitialSequenceId(int64_t initialSequenceId); /** * The getter associated with setInitialSequenceId(). */ int64_t getInitialSequenceId() const; /** * Set the compression type for the producer. * <p> * By default, message payloads are not compressed. Supported compression types are: * <ul> * * <li>{@link CompressionNone}: No compression</li> * <li>{@link CompressionLZ4}: LZ4 Compression https://lz4.github.io/lz4/ * <li>{@link CompressionZLib}: ZLib Compression http://zlib.net/</li> * <li>{@link CompressionZSTD}: Zstandard Compression https://facebook.github.io/zstd/ (Since Pulsar 2.3. * Zstd cannot be used if consumer applications are not in version >= 2.3 as well)</li> * <li>{@link CompressionSNAPPY}: Snappy Compression https://google.github.io/snappy/ (Since Pulsar 2.4. * Snappy cannot be used if consumer applications are not in version >= 2.4 as well)</li> * </ul> */ ProducerConfiguration& setCompressionType(CompressionType compressionType); /** * The getter associated with setCompressionType(). */ CompressionType getCompressionType() const; /** * Set the max size of the queue holding the messages pending to receive an acknowledgment from the * broker. <p> When the queue is full, by default, all calls to Producer::send and Producer::sendAsync * would fail unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the * blocking behavior. * * Default: 1000 * * @param maxPendingMessages max number of pending messages. * @return */ ProducerConfiguration& setMaxPendingMessages(int maxPendingMessages); /** * The getter associated with setMaxPendingMessages(). */ int getMaxPendingMessages() const; /** * Set the number of max pending messages across all the partitions * <p> * This setting will be used to lower the max pending messages for each partition * ({@link #setMaxPendingMessages(int)}), if the total exceeds the configured value. * * Default: 50000 * * @param maxPendingMessagesAcrossPartitions */ ProducerConfiguration& setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions); /** * @return the maximum number of pending messages allowed across all the partitions */ int getMaxPendingMessagesAcrossPartitions() const; /** * Set the message routing modes for partitioned topics. * * Default: UseSinglePartition * * @param PartitionsRoutingMode partition routing mode. * @return */ ProducerConfiguration& setPartitionsRoutingMode(const PartitionsRoutingMode& mode); /** * The getter associated with setPartitionsRoutingMode(). */ PartitionsRoutingMode getPartitionsRoutingMode() const; /** * Set a custom message routing policy by passing an implementation of MessageRouter. * * @param messageRouter message router. * @return */ ProducerConfiguration& setMessageRouter(const MessageRoutingPolicyPtr& router); /** * The getter associated with setMessageRouter(). */ const MessageRoutingPolicyPtr& getMessageRouterPtr() const; /** * Set the hashing scheme, which is a standard hashing function available when choosing the partition * used for a particular message. * * Default: HashingScheme::BoostHash * * <p>Standard hashing functions available are: * <ul> * <li>{@link HashingScheme::JavaStringHash}: Java {@code String.hashCode()} (Default). * <li>{@link HashingScheme::BoostHash}: Use [Boost hashing * function](https://www.boost.org/doc/libs/1_72_0/doc/html/boost/hash.html). * <li>{@link HashingScheme::Murmur3_32Hash}: Use [Murmur3 hashing * function](https://en.wikipedia.org/wiki/MurmurHash"). * </ul> * * @param scheme hashing scheme. * @return */ ProducerConfiguration& setHashingScheme(const HashingScheme& scheme); /** * The getter associated with setHashingScheme(). */ HashingScheme getHashingScheme() const; /** * This config affects producers of partitioned topics only. It controls whether * producers register and connect immediately to the owner broker of each partition * or start lazily on demand. The internal producer of one partition is always * started eagerly, chosen by the routing policy, but the internal producers of * any additional partitions are started on demand, upon receiving their first * message. * Using this mode can reduce the strain on brokers for topics with large numbers of * partitions and when the SinglePartition routing policy is used without keyed messages. * Because producer connection can be on demand, this can produce extra send latency * for the first messages of a given partition. * @param true/false as to whether to start partition producers lazily * @return */ ProducerConfiguration& setLazyStartPartitionedProducers(bool); /** * The getter associated with setLazyStartPartitionedProducers() */ bool getLazyStartPartitionedProducers() const; /** * The setter associated with getBlockIfQueueFull() */ ProducerConfiguration& setBlockIfQueueFull(bool); /** * @return whether Producer::send or Producer::sendAsync operations should block when the outgoing message * queue is full. (Default: false) */ bool getBlockIfQueueFull() const; // Zero queue size feature will not be supported on consumer end if batching is enabled /** * Control whether automatic batching of messages is enabled or not for the producer. * * Default: true * * When automatic batching is enabled, multiple calls to Producer::sendAsync can result in a single batch * to be sent to the broker, leading to better throughput, especially when publishing small messages. If * compression is enabled, messages are compressed at the batch level, leading to a much better * compression ratio for similar headers or contents. * * When the default batch delay is set to 10 ms and the default batch size is 1000 messages. * * @see ProducerConfiguration::setBatchingMaxPublishDelayMs * */ ProducerConfiguration& setBatchingEnabled(const bool& batchingEnabled); /** * Return the flag whether automatic message batching is enabled or not for the producer. * * @return true if automatic message batching is enabled. Otherwise it returns false. * @since 2.0.0 <br> * It is enabled by default. */ const bool& getBatchingEnabled() const; /** * Set the max number of messages permitted in a batch. <i>Default value: 1000.</i> If you set this option * to a value greater than 1, messages are queued until this threshold is reached or batch interval has * elapsed. * * All messages in a batch are published as * a single batch message. The consumer is delivered individual messages in the batch in the same * order they are enqueued. * @param batchMessagesMaxMessagesPerBatch max number of messages permitted in a batch * @return */ ProducerConfiguration& setBatchingMaxMessages(const unsigned int& batchingMaxMessages); /** * The getter associated with setBatchingMaxMessages(). */ const unsigned int& getBatchingMaxMessages() const; /** * Set the max size of messages permitted in a batch. * <i>Default value: 128 KB.</i> If you set this option to a value greater than 1, * messages are queued until this threshold is reached or * batch interval has elapsed. * * <p>All messages in a batch are published as a single batch message. * The consumer is delivered individual * messages in the batch in the same order they are enqueued. * * @param batchingMaxAllowedSizeInBytes */ ProducerConfiguration& setBatchingMaxAllowedSizeInBytes( const unsigned long& batchingMaxAllowedSizeInBytes); /** * The getter associated with setBatchingMaxAllowedSizeInBytes(). */ const unsigned long& getBatchingMaxAllowedSizeInBytes() const; /** * Set the max time for message publish delay permitted in a batch. * <i>Default value: 10 ms.</i> * * @param batchingMaxPublishDelayMs max time for message publish delay permitted in a batch. * @return */ ProducerConfiguration& setBatchingMaxPublishDelayMs(const unsigned long& batchingMaxPublishDelayMs); /** * The getter associated with setBatchingMaxPublishDelayMs(). */ const unsigned long& getBatchingMaxPublishDelayMs() const; /** * Default: DefaultBatching * * @see BatchingType */ ProducerConfiguration& setBatchingType(BatchingType batchingType); /** * @return batching type. * @see BatchingType. */ BatchingType getBatchingType() const; /** * The getter associated with setCryptoKeyReader(). */ const CryptoKeyReaderPtr getCryptoKeyReader() const; /** * Set the shared pointer to CryptoKeyReader. * * @param shared pointer to CryptoKeyReader. * @return */ ProducerConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader); /** * The getter associated with setCryptoFailureAction(). */ ProducerCryptoFailureAction getCryptoFailureAction() const; /** * Sets the ProducerCryptoFailureAction to the value specified. * * @param action * the action taken by the producer in case of encryption failures. * @return */ ProducerConfiguration& setCryptoFailureAction(ProducerCryptoFailureAction action); /** * @return all the encryption keys added */ const std::set<std::string>& getEncryptionKeys() const; /** * @return true if encryption keys are added */ bool isEncryptionEnabled() const; /** * Add public encryption key, used by producer to encrypt the data key. * * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If * keys are found, a callback getKey(String keyName) is invoked against each key to load the values of the * key. Application should implement this callback to return the key in pkcs8 format. If compression is * enabled, message is encrypted after compression. If batch messaging is enabled, the batched message is * encrypted. * * @key the encryption key to add * @return the ProducerConfiguration self */ ProducerConfiguration& addEncryptionKey(std::string key); /** * Check whether the producer has a specific property attached. * * @param name the name of the property to check * @return true if the message has the specified property * @return false if the property is not defined */ bool hasProperty(const std::string& name) const; /** * Get the value of a specific property * * @param name the name of the property * @return the value of the property or null if the property was not defined */ const std::string& getProperty(const std::string& name) const; /** * Get all the properties attached to this producer. */ std::map<std::string, std::string>& getProperties() const; /** * Sets a new property on the producer * . * @param name the name of the property * @param value the associated value */ ProducerConfiguration& setProperty(const std::string& name, const std::string& value); /** * Add all the properties in the provided map */ ProducerConfiguration& setProperties(const std::map<std::string, std::string>& properties); /** * If message size is higher than allowed max publish-payload size by broker then enableChunking helps * producer to split message into multiple chunks and publish them to broker separately in order. So, it * allows client to successfully publish large size of messages in pulsar. * * Set it true to enable this feature. If so, you must disable batching (see setBatchingEnabled), * otherwise the producer creation will fail. * * There are some other recommendations when it's enabled: * 1. This features is right now only supported for non-shared subscription and persistent-topic. * 2. It's better to reduce setMaxPendingMessages to avoid producer accupying large amount of memory by * buffered messages. * 3. Set message-ttl on the namespace to cleanup chunked messages. Sometimes due to broker-restart or * publish time, producer might fail to publish entire large message. So, consumer will not be able to * consume and ack those messages. * * Default: false * * @param chunkingEnabled whether chunking is enabled * @return the ProducerConfiguration self */ ProducerConfiguration& setChunkingEnabled(bool chunkingEnabled); /** * The getter associated with setChunkingEnabled(). */ bool isChunkingEnabled() const; /** * Set the type of access mode that the producer requires on the topic. * * @see ProducerAccessMode * @param accessMode * The type of access to the topic that the producer requires */ ProducerConfiguration& setAccessMode(const ProducerAccessMode& accessMode); /** * Get the type of access mode that the producer requires on the topic. */ ProducerAccessMode getAccessMode() const; ProducerConfiguration& intercept(const std::vector<ProducerInterceptorPtr>& interceptors); const std::vector<ProducerInterceptorPtr>& getInterceptors() const; private: std::shared_ptr<ProducerConfigurationImpl> impl_; friend class PulsarWrapper; friend class ConsumerImpl; friend class ProducerImpl; }; } // namespace pulsar #endif /* PULSAR_PRODUCERCONFIGURATION_H_ */