in pulsar/consumer_impl.go [248:297]
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (Consumer, error) {
partitions, err := client.TopicPartitions(topic)
if err != nil {
return nil, err
}
if len(partitions) > 1 && options.EnableZeroQueueConsumer {
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}
if len(partitions) == 1 && options.EnableZeroQueueConsumer &&
strings.Contains(partitions[0], utils.PARTITIONEDTOPICSUFFIX) {
return nil, pkgerrors.New("ZeroQueueConsumer is not supported for partitioned topics")
}
if len(partitions) == 1 && options.EnableZeroQueueConsumer {
return newZeroConsumer(client, options, topic, messageCh, dlq, rlq, disableForceTopicCreation)
}
consumer := &consumer{
topic: topic,
client: client,
options: options,
disableForceTopicCreation: disableForceTopicCreation,
messageCh: messageCh,
closeCh: make(chan struct{}),
errorCh: make(chan error),
dlq: dlq,
rlq: rlq,
log: client.log.SubLogger(log.Fields{"topic": topic}),
consumerName: options.Name,
metrics: client.metrics.GetLeveledMetrics(topic),
}
err = consumer.internalTopicSubscribeToPartitions()
if err != nil {
return nil, err
}
// set up timer to monitor for new partitions being added
duration := options.AutoDiscoveryPeriod
if duration <= 0 {
duration = defaultAutoDiscoveryDuration
}
consumer.stopDiscovery = consumer.runBackgroundPartitionDiscovery(duration)
consumer.metrics.ConsumersOpened.Inc()
return consumer, nil
}