func newInternalConsumer()

in pulsar/consumer_impl.go [248:297]


func newInternalConsumer(client *client, options ConsumerOptions, topic string,
	messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (Consumer, error) {
	partitions, err := client.TopicPartitions(topic)
	if err != nil {
		return nil, err
	}

	if len(partitions) > 1 && options.EnableZeroQueueConsumer {
		return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
	}

	if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
		strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
		return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
	}

	if len(partitions) == 1 && options.EnableZeroQueueConsumer {
		return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation)
	}

	consumer := &consumer{
		topic:                     topic,
		client:                    client,
		options:                   options,
		disableForceTopicCreation: disableForceTopicCreation,
		messageCh:                 messageCh,
		closeCh:                   make(chan struct{}),
		errorCh:                   make(chan error),
		dlq:                       dlq,
		rlq:                       rlq,
		log:                       client.log.SubLogger(log.Fields{"topic": topic}),
		consumerName:              options.Name,
		metrics:                   client.metrics.GetLeveledMetrics(topic),
	}

	err = consumer.internalTopicSubscribeToPartitions()
	if err != nil {
		return nil, err
	}

	// set up timer to monitor for new partitions being added
	duration := options.AutoDiscoveryPeriod
	if duration <= 0 {
		duration = defaultAutoDiscoveryDuration
	}
	consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration)

	consumer.metrics.ConsumersOpened.Inc()
	return consumer, nil
}