func()

in pulsar/consumer_impl.go [330:430]


func (c *consumer) internalTopicSubscribeToPartitions() error {
	partitions, err := c.client.TopicPartitions(c.topic)
	if err != nil {
		return err
	}

	oldNumPartitions := 0
	newNumPartitions := len(partitions)

	c.Lock()
	defer c.Unlock()

	oldConsumers := c.consumers
	oldNumPartitions = len(oldConsumers)

	if oldConsumers != nil {
		if oldNumPartitions == newNumPartitions {
			c.log.Debug("Number of partitions in topic has not changed")
			return nil
		}

		c.log.WithField("old_partitions", oldNumPartitions).
			WithField("new_partitions", newNumPartitions).
			Info("Changed number of partitions in topic")
	}

	c.consumers = make([]*partitionConsumer, newNumPartitions)

	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
	// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
	if oldConsumers != nil && oldNumPartitions < newNumPartitions {
		// Copy over the existing consumer instances
		for i := 0; i < oldNumPartitions; i++ {
			c.consumers[i] = oldConsumers[i]
		}
	}

	type ConsumerError struct {
		err       error
		partition int
		consumer  *partitionConsumer
	}

	startPartition := oldNumPartitions
	partitionsToAdd := newNumPartitions - oldNumPartitions

	if partitionsToAdd < 0 {
		partitionsToAdd = newNumPartitions
		startPartition = 0
	}

	var wg sync.WaitGroup
	ch := make(chan ConsumerError, partitionsToAdd)
	wg.Add(partitionsToAdd)

	for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
		partitionTopic := partitions[partitionIdx]

		go func() {
			defer wg.Done()
			opts := newPartitionConsumerOpts(partitionTopic, c.consumerName, partitionIdx, c.options)
			cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
			ch <- ConsumerError{
				err:       err,
				partition: partitionIdx,
				consumer:  cons,
			}
		}()
	}

	go func() {
		wg.Wait()
		close(ch)
	}()

	for ce := range ch {
		if ce.err != nil {
			err = ce.err
		} else {
			c.consumers[ce.partition] = ce.consumer
		}
	}

	if err != nil {
		// Since there were some failures,
		// cleanup all the partitions that succeeded in creating the consumer
		for _, c := range c.consumers {
			if c != nil {
				c.Close()
			}
		}
		return err
	}

	if newNumPartitions < oldNumPartitions {
		c.metrics.ConsumersPartitions.Set(float64(newNumPartitions))
	} else {
		c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
	}
	return nil
}