pulsar/producer_impl.go (280 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pulsar import ( "context" "fmt" "sync" "sync/atomic" "time" "unsafe" "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsar/log" ) const ( // defaultSendTimeout init default timeout for ack since sent. defaultSendTimeout = 30 * time.Second // defaultBatchingMaxPublishDelay init default for maximum delay to batch messages defaultBatchingMaxPublishDelay = 10 * time.Millisecond // defaultMaxBatchSize init default for maximum number of bytes per batch defaultMaxBatchSize = 128 * 1024 // defaultMaxMessagesPerBatch init default num of entries in per batch. defaultMaxMessagesPerBatch = 1000 // defaultPartitionsAutoDiscoveryInterval init default time interval for partitions auto discovery defaultPartitionsAutoDiscoveryInterval = 1 * time.Minute ) type producer struct { sync.RWMutex client *client options *ProducerOptions topic string producers []Producer producersPtr unsafe.Pointer numPartitions uint32 messageRouter func(*ProducerMessage, TopicMetadata) int closeOnce sync.Once stopDiscovery func() log log.Logger metrics *internal.LeveledMetrics } func getHashingFunction(s HashingScheme) func(string) uint32 { switch s { case JavaStringHash: return internal.JavaStringHash case Murmur3_32Hash: return internal.Murmur3_32Hash default: return internal.JavaStringHash } } func newProducer(client *client, options *ProducerOptions) (*producer, error) { if options.Topic == "" { return nil, newError(InvalidTopicName, "Topic name is required for producer") } if options.SendTimeout == 0 { options.SendTimeout = defaultSendTimeout } if options.BatchingMaxMessages == 0 { options.BatchingMaxMessages = defaultMaxMessagesPerBatch } if options.BatchingMaxSize == 0 { options.BatchingMaxSize = defaultMaxBatchSize } if options.BatchingMaxPublishDelay <= 0 { options.BatchingMaxPublishDelay = defaultBatchingMaxPublishDelay } if options.PartitionsAutoDiscoveryInterval <= 0 { options.PartitionsAutoDiscoveryInterval = defaultPartitionsAutoDiscoveryInterval } if !options.DisableBatching && options.EnableChunking { return nil, fmt.Errorf("batching and chunking can not be enabled together") } p := &producer{ options: options, topic: options.Topic, client: client, log: client.log.SubLogger(log.Fields{"topic": options.Topic}), metrics: client.metrics.GetLeveledMetrics(options.Topic), } if options.Interceptors == nil { options.Interceptors = defaultProducerInterceptors } if options.MessageRouter == nil { internalRouter := NewDefaultRouter( getHashingFunction(options.HashingScheme), options.BatchingMaxMessages, options.BatchingMaxSize, options.BatchingMaxPublishDelay, options.DisableBatching) p.messageRouter = func(message *ProducerMessage, metadata TopicMetadata) int { return internalRouter(message, metadata.NumPartitions()) } } else { p.messageRouter = options.MessageRouter } if options.Schema != nil && options.Schema.GetSchemaInfo() != nil { if options.Schema.GetSchemaInfo().Type == NONE { options.Schema = NewBytesSchema(nil) } } encryption := options.Encryption // add default message crypto if not provided if encryption != nil && len(encryption.Keys) > 0 { if encryption.KeyReader == nil { return nil, fmt.Errorf("encryption is enabled, KeyReader can not be nil") } if encryption.MessageCrypto == nil { logCtx := fmt.Sprintf("[%v] [%v]", p.topic, p.options.Name) messageCrypto, err := crypto.NewDefaultMessageCrypto(logCtx, true, client.log.SubLogger(log.Fields{"topic": p.topic})) if err != nil { return nil, fmt.Errorf("unable to get MessageCrypto instance. Producer creation is abandoned. %w", err) } p.options.Encryption.MessageCrypto = messageCrypto } } err := p.internalCreatePartitionsProducers() if err != nil { return nil, err } p.stopDiscovery = p.runBackgroundPartitionDiscovery(options.PartitionsAutoDiscoveryInterval) p.metrics.ProducersOpened.Inc() return p, nil } func (p *producer) runBackgroundPartitionDiscovery(period time.Duration) (cancel func()) { var wg sync.WaitGroup stopDiscoveryCh := make(chan struct{}) ticker := time.NewTicker(period) wg.Add(1) go func() { defer wg.Done() for { select { case <-stopDiscoveryCh: return case <-ticker.C: p.log.Debug("Auto discovering new partitions") p.internalCreatePartitionsProducers() } } }() return func() { ticker.Stop() close(stopDiscoveryCh) wg.Wait() } } func (p *producer) internalCreatePartitionsProducers() error { partitions, err := p.client.TopicPartitions(p.topic) if err != nil { return err } oldNumPartitions := 0 newNumPartitions := len(partitions) p.Lock() defer p.Unlock() oldProducers := p.producers oldNumPartitions = len(oldProducers) if oldProducers != nil { if oldNumPartitions == newNumPartitions { p.log.Debug("Number of partitions in topic has not changed") return nil } p.log.WithField("old_partitions", oldNumPartitions). WithField("new_partitions", newNumPartitions). Info("Changed number of partitions in topic") } p.producers = make([]Producer, newNumPartitions) // When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions, // we need to rebuild the cache of new producers, otherwise the array will be out of bounds. if oldProducers != nil && oldNumPartitions < newNumPartitions { // Copy over the existing consumer instances for i := 0; i < oldNumPartitions; i++ { p.producers[i] = oldProducers[i] } } type ProducerError struct { partition int prod Producer err error } startPartition := oldNumPartitions partitionsToAdd := newNumPartitions - oldNumPartitions if partitionsToAdd < 0 { partitionsToAdd = newNumPartitions startPartition = 0 } c := make(chan ProducerError, partitionsToAdd) for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ { partition := partitions[partitionIdx] go func(partitionIdx int, partition string) { prod, e := newPartitionProducer(p.client, partition, p.options, partitionIdx, p.metrics) c <- ProducerError{ partition: partitionIdx, prod: prod, err: e, } }(partitionIdx, partition) } for i := 0; i < partitionsToAdd; i++ { pe, ok := <-c if ok { if pe.err != nil { err = pe.err } else { p.producers[pe.partition] = pe.prod } } } if err != nil { // Since there were some failures, cleanup all the partitions that succeeded in creating the producers for _, producer := range p.producers { if producer != nil { producer.Close() } } return err } if newNumPartitions < oldNumPartitions { p.metrics.ProducersPartitions.Set(float64(newNumPartitions)) } else { p.metrics.ProducersPartitions.Add(float64(partitionsToAdd)) } atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers)) atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers))) return nil } func (p *producer) Topic() string { return p.topic } func (p *producer) Name() string { p.RLock() defer p.RUnlock() return p.producers[0].Name() } func (p *producer) NumPartitions() uint32 { return atomic.LoadUint32(&p.numPartitions) } func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) { return p.getPartition(msg).Send(ctx, msg) } func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error)) { p.getPartition(msg).SendAsync(ctx, msg, callback) } func (p *producer) getPartition(msg *ProducerMessage) Producer { // Since partitions can only increase, it's ok if the producers list // is updated in between. The numPartition is updated only after the list. partition := p.messageRouter(msg, p) producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr)) if partition >= len(producers) { // We read the old producers list while the count was already // updated partition %= len(producers) } return producers[partition] } func (p *producer) LastSequenceID() int64 { p.RLock() defer p.RUnlock() var maxSeq int64 = -1 for _, pp := range p.producers { s := pp.LastSequenceID() if s > maxSeq { maxSeq = s } } return maxSeq } func (p *producer) Flush() error { return p.FlushWithCtx(context.Background()) } func (p *producer) FlushWithCtx(ctx context.Context) error { p.RLock() defer p.RUnlock() for _, pp := range p.producers { if err := pp.FlushWithCtx(ctx); err != nil { return err } } return nil } func (p *producer) Close() { p.closeOnce.Do(func() { p.stopDiscovery() p.Lock() defer p.Unlock() for _, pp := range p.producers { pp.Close() } p.client.handlers.Del(p) p.metrics.ProducersPartitions.Sub(float64(len(p.producers))) p.metrics.ProducersClosed.Inc() }) }