in pulsar/reader_impl.go [45:144]
func newReader(client *client, options ReaderOptions) (Reader, error) {
if options.Topic == "" {
return nil, newError(InvalidConfiguration, "Topic is required")
}
if options.StartMessageID == nil {
return nil, newError(InvalidConfiguration, "StartMessageID is required")
}
var startMessageID *trackingMessageID
if !checkMessageIDType(options.StartMessageID) {
// a custom type satisfying MessageID may not be a messageID or trackingMessageID
// so re-create messageID using its data
deserMsgID, err := deserializeMessageID(options.StartMessageID.Serialize())
if err != nil {
return nil, err
}
// de-serialized MessageID is a messageID
startMessageID = toTrackingMessageID(deserMsgID)
} else {
startMessageID = toTrackingMessageID(options.StartMessageID)
}
subscriptionName := options.SubscriptionName
if subscriptionName == "" {
subscriptionName = options.SubscriptionRolePrefix
if subscriptionName == "" {
subscriptionName = "reader"
}
subscriptionName += "-" + generateRandomName()
}
receiverQueueSize := options.ReceiverQueueSize
if receiverQueueSize <= 0 {
receiverQueueSize = defaultReceiverQueueSize
}
// decryption is enabled, use default message crypto if not provided
if options.Decryption != nil && options.Decryption.MessageCrypto == nil {
messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt",
false,
client.log.SubLogger(log.Fields{"topic": options.Topic}))
if err != nil {
return nil, err
}
options.Decryption.MessageCrypto = messageCrypto
}
if options.MaxPendingChunkedMessage == 0 {
options.MaxPendingChunkedMessage = 100
}
if options.ExpireTimeOfIncompleteChunk == 0 {
options.ExpireTimeOfIncompleteChunk = time.Minute
}
consumerOptions := &partitionConsumerOpts{
topic: options.Topic,
consumerName: options.Name,
subscription: subscriptionName,
subscriptionType: Exclusive,
receiverQueueSize: receiverQueueSize,
startMessageID: startMessageID,
startMessageIDInclusive: options.StartMessageIDInclusive,
subscriptionMode: NonDurable,
readCompacted: options.ReadCompacted,
metadata: options.Properties,
nackRedeliveryDelay: defaultNackRedeliveryDelay,
replicateSubscriptionState: false,
decryption: options.Decryption,
schema: options.Schema,
backoffPolicy: options.BackoffPolicy,
maxPendingChunkedMessage: options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: options.AutoAckIncompleteChunk,
}
reader := &reader{
client: client,
messageCh: make(chan ConsumerMessage),
log: client.log.SubLogger(log.Fields{"topic": options.Topic}),
metrics: client.metrics.GetLeveledMetrics(options.Topic),
}
// Provide dummy dlq router with not dlq policy
dlq, err := newDlqRouter(client, nil, client.log)
if err != nil {
return nil, err
}
pc, err := newPartitionConsumer(nil, client, consumerOptions, reader.messageCh, dlq, reader.metrics)
if err != nil {
close(reader.messageCh)
return nil, err
}
reader.pc = pc
reader.metrics.ReadersOpened.Inc()
return reader, nil
}