func newReader()

in pulsar/reader_impl.go [46:155]


func newReader(client *client, options ReaderOptions) (Reader, error) {
	if options.Topic == "" {
		return nil, newError(InvalidConfiguration, "Topic is required")
	}

	if options.StartMessageID == nil {
		return nil, newError(InvalidConfiguration, "StartMessageID is required")
	}

	var startMessageID *trackingMessageID
	if !checkMessageIDType(options.StartMessageID) {
		// a custom type satisfying MessageID may not be a messageID or trackingMessageID
		// so re-create messageID using its data
		deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize())
		if err != nil {
			return nil, err
		}
		// de-serialized MessageID is a messageID
		startMessageID = toTrackingMessageID(deserMsgID)
	} else {
		startMessageID = toTrackingMessageID(options.StartMessageID)
	}

	subscriptionName := options.SubscriptionName
	if subscriptionName == "" {
		subscriptionName = options.SubscriptionRolePrefix
		if subscriptionName == "" {
			subscriptionName = "reader"
		}
		subscriptionName += "-" + generateRandomName()
	}

	receiverQueueSize := options.ReceiverQueueSize
	if receiverQueueSize <= 0 {
		receiverQueueSize = defaultReceiverQueueSize
	}

	// decryption is enabled, use default message crypto if not provided
	if options.Decryption != nil && options.Decryption.MessageCrypto == nil {
		messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt",
			false,
			client.log.SubLogger(log.Fields{"topic": options.Topic}))
		if err != nil {
			return nil, err
		}
		options.Decryption.MessageCrypto = messageCrypto
	}

	if options.MaxPendingChunkedMessage == 0 {
		options.MaxPendingChunkedMessage = 100
	}

	if options.ExpireTimeOfIncompleteChunk == 0 {
		options.ExpireTimeOfIncompleteChunk = time.Minute
	}

	consumerOptions := &ConsumerOptions{
		Topic:                       options.Topic,
		Name:                        options.Name,
		SubscriptionName:            subscriptionName,
		Type:                        Exclusive,
		ReceiverQueueSize:           receiverQueueSize,
		SubscriptionMode:            NonDurable,
		ReadCompacted:               options.ReadCompacted,
		Properties:                  options.Properties,
		NackRedeliveryDelay:         defaultNackRedeliveryDelay,
		ReplicateSubscriptionState:  false,
		Decryption:                  options.Decryption,
		Schema:                      options.Schema,
		BackOffPolicyFunc:           options.BackoffPolicyFunc,
		MaxPendingChunkedMessage:    options.MaxPendingChunkedMessage,
		ExpireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk,
		AutoAckIncompleteChunk:      options.AutoAckIncompleteChunk,
		startMessageID:              startMessageID,
		StartMessageIDInclusive:     options.StartMessageIDInclusive,
	}

	reader := &reader{
		client:    client,
		messageCh: make(chan ConsumerMessage),
		log:       client.log.SubLogger(log.Fields{"topic": options.Topic}),
		metrics:   client.metrics.GetLeveledMetrics(options.Topic),
	}

	// Provide dummy dlq router with not dlq policy
	dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, options.Name,
		options.BackoffPolicyFunc, client.log)
	if err != nil {
		return nil, err
	}
	// Provide dummy rlq router with not dlq policy
	rlq, err := newRetryRouter(client, nil, false, options.BackoffPolicyFunc, client.log)
	if err != nil {
		return nil, err
	}

	c, err := newInternalConsumer(client, *consumerOptions, options.Topic, reader.messageCh, dlq, rlq, false)
	if err != nil {
		close(reader.messageCh)
		return nil, err
	}
	cs, ok := c.(*consumer)
	if !ok {
		return nil, errors.New("invalid consumer type")
	}
	reader.c = cs

	reader.metrics.ReadersOpened.Inc()
	return reader, nil
}