pulsar/producer.go (71 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pulsar import ( "context" "time" "github.com/apache/pulsar-client-go/pulsar/backoff" ) type HashingScheme int const ( // JavaStringHash and Java String.hashCode() equivalent JavaStringHash HashingScheme = iota // Murmur3_32Hash use Murmur3 hashing function Murmur3_32Hash ) type CompressionType int const ( NoCompression CompressionType = iota LZ4 ZLib ZSTD ) type CompressionLevel int const ( // Default compression level Default CompressionLevel = iota // Faster compression, with lower compression ratio Faster // Higher compression rate, but slower Better ) type ProducerAccessMode int const ( // ProducerAccessModeShared is default multiple producers can publish on a topic. ProducerAccessModeShared ProducerAccessMode = iota // ProducerAccessModeExclusive is required exclusive access for producer. // Fail immediately if there's already a producer connected. ProducerAccessModeExclusive // ProducerAccessModeWaitForExclusive is pending until producer can acquire exclusive access. ProducerAccessModeWaitForExclusive ) // TopicMetadata represents a topic metadata. type TopicMetadata interface { // NumPartitions returns the number of partitions for a particular topic. NumPartitions() uint32 } type ProducerOptions struct { // Topic specifies the topic this producer will be publishing on. // This argument is required when constructing the producer. Topic string // Name specifies a name for the producer. // If not assigned, the system will generate a globally unique name which can be access with // Producer.ProducerName(). // When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique // across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on // a topic. Name string // Properties specifies a set of application defined properties for the producer. // This properties will be visible in the topic stats Properties map[string]string // SendTimeout specifies the timeout for a message that has not been acknowledged by the server since sent. // Send and SendAsync returns an error after timeout. // Default is 30 seconds, negative such as -1 to disable. SendTimeout time.Duration // DisableBlockIfQueueFull controls whether Send and SendAsync block if producer's message queue is full. // Default is false, if set to true then Send and SendAsync return error when queue is full. DisableBlockIfQueueFull bool // MaxPendingMessages specifies the max size of the queue holding the messages pending to receive an // acknowledgment from the broker. MaxPendingMessages int // HashingScheme is used to define the partition on where to publish a particular message. // Standard hashing functions available are: // // - `JavaStringHash` : Java String.hashCode() equivalent // - `Murmur3_32Hash` : Use Murmur3 hashing function. // https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash // // Default is `JavaStringHash`. HashingScheme // CompressionType specifies the compression type for the producer. // By default, message payloads are not compressed. Supported compression types are: // - LZ4 // - ZLIB // - ZSTD // // Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that // release in order to be able to receive messages compressed with ZSTD. CompressionType // CompressionLevel defines the desired compression level. Options: // - Default // - Faster // - Better CompressionLevel // MessageRouter represents a custom message routing policy by passing an implementation of MessageRouter // The router is a function that given a particular message and the topic metadata, returns the // partition index where the message should be routed to MessageRouter func(*ProducerMessage, TopicMetadata) int // DisableBatching controls whether automatic batching of messages is enabled for the producer. By default batching // is enabled. // When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the // broker, leading to better throughput, especially when publishing small messages. If compression is enabled, // messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or // contents. // When enabled default batch delay is set to 1 ms and default batch size is 1000 messages // Setting `DisableBatching: true` will make the producer to send messages individually DisableBatching bool // BatchingMaxPublishDelay specifies the time period within which the messages sent will be batched (default: 10ms) // if batch messages are enabled. If set to a non zero value, messages will be queued until this time // interval or until BatchingMaxPublishDelay time.Duration // BatchingMaxMessages specifies the maximum number of messages permitted in a batch. (default: 1000) // If set to a value greater than 1, messages will be queued until this threshold is reached or // BatchingMaxSize (see below) has been reached or the batch interval has elapsed. BatchingMaxMessages uint // BatchingMaxSize specifies the maximum number of bytes permitted in a batch. (default 128 KB) // If set to a value greater than 1, messages will be queued until this threshold is reached or // BatchingMaxMessages (see above) has been reached or the batch interval has elapsed. BatchingMaxSize uint // Interceptors is a chain of interceptors, These interceptors will be called at some points defined // in ProducerInterceptor interface Interceptors ProducerInterceptors // Schema represents the schema implementation. Schema Schema // MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint // BackOffPolicyFunc parameterize the following options in the reconnection logic to // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage) BackOffPolicyFunc func() backoff.Policy // BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder) // This will be used to create batch container when batching is enabled. // Options: // - DefaultBatchBuilder // - KeyBasedBatchBuilder BatcherBuilderType // PartitionsAutoDiscoveryInterval is the time interval for the background process to discover new partitions // Default is 1 minute PartitionsAutoDiscoveryInterval time.Duration // Disable multiple Schame Version // Default false DisableMultiSchema bool // Encryption specifies the fields required to encrypt a message Encryption *ProducerEncryptionInfo // EnableChunking controls whether automatic chunking of messages is enabled for the producer. By default, chunking // is disabled. // Chunking can not be enabled when batching is enabled. EnableChunking bool // ChunkMaxMessageSize is the max size of single chunk payload. // It will actually only take effect if it is smaller than the maxMessageSize from the broker. ChunkMaxMessageSize uint // The type of access to the topic that the producer requires. (default ProducerAccessModeShared) // Options: // - ProducerAccessModeShared // - ProducerAccessModeExclusive ProducerAccessMode // initialSubscriptionName Name of the initial subscription name of the dead letter topic. // If this field is not set, the initial subscription for the dead letter topic will not be created. // If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer // will fail to be created. initialSubscriptionName string } // Producer is used to publish messages on a topic type Producer interface { // Topic return the topic to which producer is publishing to Topic() string // Name return the producer name which could have been assigned by the system or specified by the client Name() string // Send a message // This call will be blocking until is successfully acknowledged by the Pulsar broker. // Example: // producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload }) Send(context.Context, *ProducerMessage) (MessageID, error) // SendAsync a message in asynchronous mode // This call is blocked when the `maxPendingMessages` becomes full (default: 1000) // The callback will report back the message being published and // the eventual error in publishing SendAsync(context.Context, *ProducerMessage, func(MessageID, *ProducerMessage, error)) // LastSequenceID get the last sequence id that was published by this producer. // This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that // was published and acknowledged by the broker. // After recreating a producer with the same producer name, this will return the last message that was // published in the previous producer session, or -1 if there no message was ever published. // return the last sequence id published by this producer. LastSequenceID() int64 // Deprecated: Use `FlushWithCtx()` instead. Flush() error // Flush all the messages buffered in the client and wait until all messageshave been successfully // persisted. FlushWithCtx(ctx context.Context) error // Close the producer and releases resources allocated // No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case // of errors, pending writes will not be retried. Close() }