in pulsar/consumer_impl.go [303:444]
func (c *consumer) internalTopicSubscribeToPartitions() error {
partitions, err := c.client.TopicPartitions(c.topic)
if err != nil {
return err
}
oldNumPartitions := 0
newNumPartitions := len(partitions)
c.Lock()
defer c.Unlock()
oldConsumers := c.consumers
oldNumPartitions = len(oldConsumers)
if oldConsumers != nil {
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not changed")
return nil
}
c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
}
c.consumers = make([]*partitionConsumer, newNumPartitions)
// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
if oldConsumers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
c.consumers[i] = oldConsumers[i]
}
}
type ConsumerError struct {
err error
partition int
consumer *partitionConsumer
}
receiverQueueSize := c.options.ReceiverQueueSize
metadata := c.options.Properties
subProperties := c.options.SubscriptionProperties
startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
if partitionsToAdd < 0 {
partitionsToAdd = newNumPartitions
startPartition = 0
}
var wg sync.WaitGroup
ch := make(chan ConsumerError, partitionsToAdd)
wg.Add(partitionsToAdd)
for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
partitionTopic := partitions[partitionIdx]
go func(idx int, pt string) {
defer wg.Done()
var nackRedeliveryDelay time.Duration
if c.options.NackRedeliveryDelay == 0 {
nackRedeliveryDelay = defaultNackRedeliveryDelay
} else {
nackRedeliveryDelay = c.options.NackRedeliveryDelay
}
opts := &partitionConsumerOpts{
topic: pt,
consumerName: c.consumerName,
subscription: c.options.SubscriptionName,
subscriptionType: c.options.Type,
subscriptionInitPos: c.options.SubscriptionInitialPosition,
partitionIdx: idx,
receiverQueueSize: receiverQueueSize,
nackRedeliveryDelay: nackRedeliveryDelay,
nackBackoffPolicy: c.options.NackBackoffPolicy,
metadata: metadata,
subProperties: subProperties,
replicateSubscriptionState: c.options.ReplicateSubscriptionState,
startMessageID: nil,
subscriptionMode: c.options.SubscriptionMode,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
maxReconnectToBroker: c.options.MaxReconnectToBroker,
backoffPolicy: c.options.BackoffPolicy,
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: c.options.Decryption,
ackWithResponse: c.options.AckWithResponse,
maxPendingChunkedMessage: c.options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk: c.options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: c.options.AutoAckIncompleteChunk,
consumerEventListener: c.options.EventListener,
enableBatchIndexAck: c.options.EnableBatchIndexAcknowledgment,
ackGroupingOptions: c.options.AckGroupingOptions,
autoReceiverQueueSize: c.options.EnableAutoScaledReceiverQueueSize,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
err: err,
partition: idx,
consumer: cons,
}
}(partitionIdx, partitionTopic)
}
go func() {
wg.Wait()
close(ch)
}()
for ce := range ch {
if ce.err != nil {
err = ce.err
} else {
c.consumers[ce.partition] = ce.consumer
}
}
if err != nil {
// Since there were some failures,
// cleanup all the partitions that succeeded in creating the consumer
for _, c := range c.consumers {
if c != nil {
c.Close()
}
}
return err
}
if newNumPartitions < oldNumPartitions {
c.metrics.ConsumersPartitions.Set(float64(newNumPartitions))
} else {
c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
}
return nil
}