in pulsar/consumer_impl.go [76:246]
func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
return nil, newError(TopicNotFound, "topic is required")
}
if options.SubscriptionName == "" {
return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
}
if options.ReceiverQueueSize <= 0 {
options.ReceiverQueueSize = defaultReceiverQueueSize
}
if options.EnableZeroQueueConsumer {
options.ReceiverQueueSize = 0
}
if options.Interceptors == nil {
options.Interceptors = defaultConsumerInterceptors
}
if options.Name == "" {
options.Name = generateRandomName()
}
if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
if options.Schema.GetSchemaInfo().Type == NONE {
options.Schema = NewBytesSchema(nil)
}
}
if options.MaxPendingChunkedMessage == 0 {
options.MaxPendingChunkedMessage = 100
}
if options.ExpireTimeOfIncompleteChunk == 0 {
options.ExpireTimeOfIncompleteChunk = time.Minute
}
if options.NackBackoffPolicy == nil && options.EnableDefaultNackBackoffPolicy {
options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
}
// did the user pass in a message channel?
messageCh := options.MessageChannel
if options.MessageChannel == nil {
messageCh = make(chan ConsumerMessage, 10)
}
if options.RetryEnable {
usingTopic := ""
if options.Topic != "" {
usingTopic = options.Topic
} else if len(options.Topics) > 0 {
usingTopic = options.Topics[0]
}
tn, err := internal.ParseTopicName(usingTopic)
if err != nil {
return nil, err
}
topicName := internal.TopicNameWithoutPartitionPart(tn)
retryTopic := topicName + "-" + options.SubscriptionName + RetryTopicSuffix
dlqTopic := topicName + "-" + options.SubscriptionName + DlqTopicSuffix
oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
if r, err := client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil &&
r != nil &&
r.Partitions > 0 {
retryTopic = oldRetryTopic
}
if r, err := client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil &&
r != nil &&
r.Partitions > 0 {
dlqTopic = oldDlqTopic
}
if options.DLQ == nil {
options.DLQ = &DLQPolicy{
MaxDeliveries: MaxReconsumeTimes,
DeadLetterTopic: dlqTopic,
RetryLetterTopic: retryTopic,
}
} else {
if options.DLQ.DeadLetterTopic == "" {
options.DLQ.DeadLetterTopic = dlqTopic
}
if options.DLQ.RetryLetterTopic == "" {
options.DLQ.RetryLetterTopic = retryTopic
}
}
if options.Topic != "" && len(options.Topics) == 0 {
options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic}
options.Topic = ""
} else if options.Topic == "" && len(options.Topics) > 0 {
options.Topics = append(options.Topics, options.DLQ.RetryLetterTopic)
}
}
dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name,
options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
}
rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, options.BackOffPolicyFunc, client.log)
if err != nil {
return nil, err
}
// normalize as FQDN topics
var tns []*internal.TopicName
// single topic consumer
if options.Topic != "" || len(options.Topics) == 1 {
topic := options.Topic
if topic == "" {
topic = options.Topics[0]
}
if tns, err = validateTopicNames(topic); err != nil {
return nil, err
}
topic = tns[0].Name
err = addMessageCryptoIfMissing(client, &options, topic)
if err != nil {
return nil, err
}
return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false)
}
if len(options.Topics) > 1 {
if tns, err = validateTopicNames(options.Topics...); err != nil {
return nil, err
}
for i := range options.Topics {
options.Topics[i] = tns[i].Name
}
options.Topics = distinct(options.Topics)
err = addMessageCryptoIfMissing(client, &options, options.Topics)
if err != nil {
return nil, err
}
return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
}
if options.TopicsPattern != "" {
tn, err := internal.ParseTopicName(options.TopicsPattern)
if err != nil {
return nil, err
}
pattern, err := extractTopicPattern(tn)
if err != nil {
return nil, err
}
err = addMessageCryptoIfMissing(client, &options, tn.Name)
if err != nil {
return nil, err
}
return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq)
}
return nil, newError(InvalidTopicName, "topic name is required for consumer")
}