in pulsar/consumer_partition.go [310:406]
func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
messageCh chan ConsumerMessage, dlq *dlqRouter,
metrics *internal.LeveledMetrics) (*partitionConsumer, error) {
pc := &partitionConsumer{
parentConsumer: parent,
client: client,
options: options,
topic: options.topic,
name: options.consumerName,
consumerID: client.rpcClient.NewConsumerID(),
partitionIdx: int32(options.partitionIdx),
eventsCh: make(chan interface{}, 10),
maxQueueSize: int32(options.receiverQueueSize),
queueCh: make(chan []*message, options.receiverQueueSize),
startMessageID: atomicMessageID{msgID: options.startMessageID},
connectedCh: make(chan struct{}),
messageCh: messageCh,
connectClosedCh: make(chan connectionClosed, 10),
closeCh: make(chan struct{}),
clearQueueCh: make(chan func(id *trackingMessageID)),
compressionProviders: sync.Map{},
dlq: dlq,
metrics: metrics,
schemaInfoCache: newSchemaInfoCache(client, options.topic),
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
pc.client.memLimit.RegisterTrigger(pc.shrinkReceiverQueueSize)
} else {
pc.currentQueueSize.Store(int32(pc.options.receiverQueueSize))
}
pc.availablePermits = &availablePermits{pc: pc}
pc.chunkedMsgCtxMap = newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
pc.unAckChunksTracker = newUnAckChunksTracker(pc)
pc.ackGroupingTracker = newAckGroupingTracker(options.ackGroupingOptions,
func(id MessageID) { pc.sendIndividualAck(id) },
func(id MessageID) { pc.sendCumulativeAck(id) },
func(ids []*pb.MessageIdData) { pc.eventsCh <- ids })
pc.setConsumerState(consumerInit)
pc.log = client.log.SubLogger(log.Fields{
"name": pc.name,
"topic": options.topic,
"subscription": options.subscription,
"consumerID": pc.consumerID,
})
var decryptor cryptointernal.Decryptor
if pc.options.decryption == nil {
decryptor = cryptointernal.NewNoopDecryptor() // default to noopDecryptor
} else {
decryptor = cryptointernal.NewConsumerDecryptor(
options.decryption.KeyReader,
options.decryption.MessageCrypto,
pc.log,
)
}
pc.decryptor = decryptor
pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)
err := pc.grabConn()
if err != nil {
pc.log.WithError(err).Error("Failed to create consumer")
pc.nackTracker.Close()
pc.ackGroupingTracker.close()
pc.chunkedMsgCtxMap.Close()
return nil, err
}
pc.log.Info("Created consumer")
pc.setConsumerState(consumerReady)
startingMessageID := pc.startMessageID.get()
if pc.options.startMessageIDInclusive && startingMessageID != nil && startingMessageID.equal(latestMessageID) {
msgID, err := pc.requestGetLastMessageID()
if err != nil {
pc.Close()
return nil, err
}
if msgID.entryID != noMessageEntry {
pc.startMessageID.set(msgID)
// use the WithoutClear version because the dispatcher is not started yet
err = pc.requestSeekWithoutClear(msgID.messageID)
if err != nil {
pc.Close()
return nil, err
}
}
}
go pc.dispatcher()
go pc.runEventsLoop()
return pc, nil
}