func newConsumer()

in pulsar/consumer_impl.go [76:246]


func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
	if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
		return nil, newError(TopicNotFound, "topic is required")
	}

	if options.SubscriptionName == "" {
		return nil, newError(SubscriptionNotFound, "subscription name is required for consumer")
	}

	if options.ReceiverQueueSize <= 0 {
		options.ReceiverQueueSize = defaultReceiverQueueSize
	}

	if options.EnableZeroQueueConsumer {
		options.ReceiverQueueSize = 0
	}

	if options.Interceptors == nil {
		options.Interceptors = defaultConsumerInterceptors
	}

	if options.Name == "" {
		options.Name = generateRandomName()
	}

	if options.Schema != nil && options.Schema.GetSchemaInfo() != nil {
		if options.Schema.GetSchemaInfo().Type == NONE {
			options.Schema = NewBytesSchema(nil)
		}
	}

	if options.MaxPendingChunkedMessage == 0 {
		options.MaxPendingChunkedMessage = 100
	}

	if options.ExpireTimeOfIncompleteChunk == 0 {
		options.ExpireTimeOfIncompleteChunk = time.Minute
	}

	if options.NackBackoffPolicy == nil && options.EnableDefaultNackBackoffPolicy {
		options.NackBackoffPolicy = new(defaultNackBackoffPolicy)
	}

	// did the user pass in a message channel?
	messageCh := options.MessageChannel
	if options.MessageChannel == nil {
		messageCh = make(chan ConsumerMessage, 10)
	}

	if options.RetryEnable {
		usingTopic := ""
		if options.Topic != "" {
			usingTopic = options.Topic
		} else if len(options.Topics) > 0 {
			usingTopic = options.Topics[0]
		}
		tn, err := internal.ParseTopicName(usingTopic)
		if err != nil {
			return nil, err
		}

		topicName := internal.TopicNameWithoutPartitionPart(tn)

		retryTopic := topicName + "-" + options.SubscriptionName + RetryTopicSuffix
		dlqTopic := topicName + "-" + options.SubscriptionName + DlqTopicSuffix

		oldRetryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
		oldDlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix

		if r, err := client.lookupService.GetPartitionedTopicMetadata(oldRetryTopic); err == nil &&
			r != nil &&
			r.Partitions > 0 {
			retryTopic = oldRetryTopic
		}

		if r, err := client.lookupService.GetPartitionedTopicMetadata(oldDlqTopic); err == nil &&
			r != nil &&
			r.Partitions > 0 {
			dlqTopic = oldDlqTopic
		}

		if options.DLQ == nil {
			options.DLQ = &DLQPolicy{
				MaxDeliveries:    MaxReconsumeTimes,
				DeadLetterTopic:  dlqTopic,
				RetryLetterTopic: retryTopic,
			}
		} else {
			if options.DLQ.DeadLetterTopic == "" {
				options.DLQ.DeadLetterTopic = dlqTopic
			}
			if options.DLQ.RetryLetterTopic == "" {
				options.DLQ.RetryLetterTopic = retryTopic
			}
		}
		if options.Topic != "" && len(options.Topics) == 0 {
			options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic}
			options.Topic = ""
		} else if options.Topic == "" && len(options.Topics) > 0 {
			options.Topics = append(options.Topics, options.DLQ.RetryLetterTopic)
		}
	}

	dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, options.Name,
		options.BackOffPolicyFunc, client.log)
	if err != nil {
		return nil, err
	}
	rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, options.BackOffPolicyFunc, client.log)
	if err != nil {
		return nil, err
	}

	// normalize as FQDN topics
	var tns []*internal.TopicName
	// single topic consumer
	if options.Topic != "" || len(options.Topics) == 1 {
		topic := options.Topic
		if topic == "" {
			topic = options.Topics[0]
		}

		if tns, err = validateTopicNames(topic); err != nil {
			return nil, err
		}
		topic = tns[0].Name
		err = addMessageCryptoIfMissing(client, &options, topic)
		if err != nil {
			return nil, err
		}
		return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false)
	}

	if len(options.Topics) > 1 {
		if tns, err = validateTopicNames(options.Topics...); err != nil {
			return nil, err
		}
		for i := range options.Topics {
			options.Topics[i] = tns[i].Name
		}
		options.Topics = distinct(options.Topics)

		err = addMessageCryptoIfMissing(client, &options, options.Topics)
		if err != nil {
			return nil, err
		}

		return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
	}

	if options.TopicsPattern != "" {
		tn, err := internal.ParseTopicName(options.TopicsPattern)
		if err != nil {
			return nil, err
		}

		pattern, err := extractTopicPattern(tn)
		if err != nil {
			return nil, err
		}

		err = addMessageCryptoIfMissing(client, &options, tn.Name)
		if err != nil {
			return nil, err
		}

		return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq)
	}

	return nil, newError(InvalidTopicName, "topic name is required for consumer")
}