func()

in pulsar/consumer_impl.go [303:444]


func (c *consumer) internalTopicSubscribeToPartitions() error {
	partitions, err := c.client.TopicPartitions(c.topic)
	if err != nil {
		return err
	}

	oldNumPartitions := 0
	newNumPartitions := len(partitions)

	c.Lock()
	defer c.Unlock()

	oldConsumers := c.consumers
	oldNumPartitions = len(oldConsumers)

	if oldConsumers != nil {
		if oldNumPartitions == newNumPartitions {
			c.log.Debug("Number of partitions in topic has not changed")
			return nil
		}

		c.log.WithField("old_partitions", oldNumPartitions).
			WithField("new_partitions", newNumPartitions).
			Info("Changed number of partitions in topic")
	}

	c.consumers = make([]*partitionConsumer, newNumPartitions)

	// When for some reason (eg: forced deletion of sub partition) causes oldNumPartitions> newNumPartitions,
	// we need to rebuild the cache of new consumers, otherwise the array will be out of bounds.
	if oldConsumers != nil && oldNumPartitions < newNumPartitions {
		// Copy over the existing consumer instances
		for i := 0; i < oldNumPartitions; i++ {
			c.consumers[i] = oldConsumers[i]
		}
	}

	type ConsumerError struct {
		err       error
		partition int
		consumer  *partitionConsumer
	}

	receiverQueueSize := c.options.ReceiverQueueSize
	metadata := c.options.Properties
	subProperties := c.options.SubscriptionProperties

	startPartition := oldNumPartitions
	partitionsToAdd := newNumPartitions - oldNumPartitions

	if partitionsToAdd < 0 {
		partitionsToAdd = newNumPartitions
		startPartition = 0
	}

	var wg sync.WaitGroup
	ch := make(chan ConsumerError, partitionsToAdd)
	wg.Add(partitionsToAdd)

	for partitionIdx := startPartition; partitionIdx < newNumPartitions; partitionIdx++ {
		partitionTopic := partitions[partitionIdx]

		go func(idx int, pt string) {
			defer wg.Done()

			var nackRedeliveryDelay time.Duration
			if c.options.NackRedeliveryDelay == 0 {
				nackRedeliveryDelay = defaultNackRedeliveryDelay
			} else {
				nackRedeliveryDelay = c.options.NackRedeliveryDelay
			}
			opts := &partitionConsumerOpts{
				topic:                       pt,
				consumerName:                c.consumerName,
				subscription:                c.options.SubscriptionName,
				subscriptionType:            c.options.Type,
				subscriptionInitPos:         c.options.SubscriptionInitialPosition,
				partitionIdx:                idx,
				receiverQueueSize:           receiverQueueSize,
				nackRedeliveryDelay:         nackRedeliveryDelay,
				nackBackoffPolicy:           c.options.NackBackoffPolicy,
				metadata:                    metadata,
				subProperties:               subProperties,
				replicateSubscriptionState:  c.options.ReplicateSubscriptionState,
				startMessageID:              nil,
				subscriptionMode:            c.options.SubscriptionMode,
				readCompacted:               c.options.ReadCompacted,
				interceptors:                c.options.Interceptors,
				maxReconnectToBroker:        c.options.MaxReconnectToBroker,
				backoffPolicy:               c.options.BackoffPolicy,
				keySharedPolicy:             c.options.KeySharedPolicy,
				schema:                      c.options.Schema,
				decryption:                  c.options.Decryption,
				ackWithResponse:             c.options.AckWithResponse,
				maxPendingChunkedMessage:    c.options.MaxPendingChunkedMessage,
				expireTimeOfIncompleteChunk: c.options.ExpireTimeOfIncompleteChunk,
				autoAckIncompleteChunk:      c.options.AutoAckIncompleteChunk,
				consumerEventListener:       c.options.EventListener,
				enableBatchIndexAck:         c.options.EnableBatchIndexAcknowledgment,
				ackGroupingOptions:          c.options.AckGroupingOptions,
				autoReceiverQueueSize:       c.options.EnableAutoScaledReceiverQueueSize,
			}
			cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics)
			ch <- ConsumerError{
				err:       err,
				partition: idx,
				consumer:  cons,
			}
		}(partitionIdx, partitionTopic)
	}

	go func() {
		wg.Wait()
		close(ch)
	}()

	for ce := range ch {
		if ce.err != nil {
			err = ce.err
		} else {
			c.consumers[ce.partition] = ce.consumer
		}
	}

	if err != nil {
		// Since there were some failures,
		// cleanup all the partitions that succeeded in creating the consumer
		for _, c := range c.consumers {
			if c != nil {
				c.Close()
			}
		}
		return err
	}

	if newNumPartitions < oldNumPartitions {
		c.metrics.ConsumersPartitions.Set(float64(newNumPartitions))
	} else {
		c.metrics.ConsumersPartitions.Add(float64(partitionsToAdd))
	}
	return nil
}