in pulsar/consumer_impl.go [330:430]
func (c *consumer) internalTopicSubscribeToPartitions() error {
partitions, err := c.client.TopicPartitions(c.topic)
if err != nil {
return err
}
oldNumPartitions := 0
newNumPartitions := len(partitions)
c.Lock()
defer c.Unlock()
oldConsumers := c.consumers
oldNumPartitions = len(oldConsumers)
if oldConsumers != nil {
if oldNumPartitions == newNumPartitions {
c.log.Debug("Number of partitions in topic has not changed")
return nil
}
c.log.WithField("old_partitions", oldNumPartitions).
WithField("new_partitions", newNumPartitions).
Info("Changed number of partitions in topic")
}
c.consumers = make([]*partitionConsumer, newNumPartitions)
// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
if oldConsumers != nil && oldNumPartitions < newNumPartitions {
// Copy over the existing consumer instances
for i := 0; i < oldNumPartitions; i++ {
c.consumers[i] = oldConsumers[i]
}
}
type ConsumerError struct {
err error
partition int
consumer *partitionConsumer
}
startPartition := oldNumPartitions
partitionsToAdd := newNumPartitions - oldNumPartitions
if partitionsToAdd < 0 {
partitionsToAdd = newNumPartitions
startPartition = 0
}
var wg sync.WaitGroup
ch := make(chan ConsumerError, partitionsToAdd)
wg.Add(partitionsToAdd)
for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
partitionTopic := partitions[partitionIdx]
go func() {
defer wg.Done()
opts := newPartitionConsumerOpts(partitionTopic, c.consumerName, partitionIdx, c.options)
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
err: err,
partition: partitionIdx,
consumer: cons,
}
}()
}
go func() {
wg.Wait()
close(ch)
}()
for ce := range ch {
if ce.err != nil {
err = ce.err
} else {
c.consumers[ce.partition] = ce.consumer
}
}
if err != nil {
// Since there were some failures,
// cleanup all the partitions that succeeded in creating the consumer
for _, c := range c.consumers {
if c != nil {
c.Close()
}
}
return err
}
if newNumPartitions < oldNumPartitions {
c.metrics.ConsumersPartitions.Set(float64(newNumPartitions))
} else {
c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
}
return nil
}