func newPartitionConsumer()

in pulsar/consumer_partition.go [344:459]


func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts,
	messageCh chan ConsumerMessage, dlq *dlqRouter,
	metrics *internal.LeveledMetrics) (*partitionConsumer, error) {
	var boFunc func() backoff.Policy
	if options.backOffPolicyFunc != nil {
		boFunc = options.backOffPolicyFunc
	} else {
		boFunc = backoff.NewDefaultBackoff
	}

	ctx, cancelFunc := context.WithCancel(context.Background())
	pc := &partitionConsumer{
		parentConsumer:             parent,
		client:                     client,
		options:                    options,
		cnxKeySuffix:               client.cnxPool.GenerateRoundRobinIndex(),
		topic:                      options.topic,
		name:                       options.consumerName,
		consumerID:                 client.rpcClient.NewConsumerID(),
		partitionIdx:               int32(options.partitionIdx),
		eventsCh:                   make(chan interface{}, 10),
		maxQueueSize:               int32(options.receiverQueueSize),
		queueCh:                    make(chan []*message, options.receiverQueueSize),
		startMessageID:             atomicMessageID{msgID: options.startMessageID},
		connectedCh:                make(chan struct{}),
		messageCh:                  messageCh,
		connectClosedCh:            make(chan *connectionClosed, 1),
		closeCh:                    make(chan struct{}),
		clearQueueCh:               make(chan func(id *trackingMessageID)),
		compressionProviders:       sync.Map{},
		dlq:                        dlq,
		metrics:                    metrics,
		schemaInfoCache:            newSchemaInfoCache(client, options.topic),
		backoffPolicyFunc:          boFunc,
		dispatcherSeekingControlCh: make(chan struct{}),
		ctx:                        ctx,
		cancelFunc:                 cancelFunc,
	}
	if pc.options.autoReceiverQueueSize {
		pc.currentQueueSize.Store(initialReceiverQueueSize)
		pc.client.memLimit.RegisterTrigger(pc.shrinkReceiverQueueSize)
	} else {
		pc.currentQueueSize.Store(int32(pc.options.receiverQueueSize))
	}
	pc.availablePermits = &availablePermits{pc: pc}
	pc.chunkedMsgCtxMap = newChunkedMsgCtxMap(options.maxPendingChunkedMessage, pc)
	pc.unAckChunksTracker = newUnAckChunksTracker(pc)
	pc.ackGroupingTracker = newAckGroupingTracker(options.ackGroupingOptions,
		func(id MessageID) { pc.sendIndividualAck(id) },
		func(id MessageID) { pc.sendCumulativeAck(id) },
		func(ids []*pb.MessageIdData) {
			pc.eventsCh <- &ackListRequest{
				errCh:  nil, // ignore the error
				msgIDs: ids,
			}
		})
	pc.setConsumerState(consumerInit)
	pc.log = client.log.SubLogger(log.Fields{
		"name":         pc.name,
		"topic":        options.topic,
		"subscription": options.subscription,
		"consumerID":   pc.consumerID,
	})

	var decryptor cryptointernal.Decryptor
	if pc.options.decryption == nil {
		decryptor = cryptointernal.NewNoopDecryptor() // default to noopDecryptor
	} else {
		decryptor = cryptointernal.NewConsumerDecryptor(
			options.decryption.KeyReader,
			options.decryption.MessageCrypto,
			pc.log,
		)
	}

	pc.decryptor = decryptor

	pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, options.nackBackoffPolicy, pc.log)

	err := pc.grabConn("")
	if err != nil {
		pc.log.WithError(err).Error("Failed to create consumer")
		pc.nackTracker.Close()
		pc.ackGroupingTracker.close()
		pc.chunkedMsgCtxMap.Close()
		return nil, err
	}
	pc.log.Info("Created consumer")
	pc.setConsumerState(consumerReady)

	startingMessageID := pc.startMessageID.get()
	if pc.options.startMessageIDInclusive && startingMessageID != nil && startingMessageID.equal(latestMessageID) {
		msgIDResp, err := pc.requestGetLastMessageID()
		if err != nil {
			pc.Close()
			return nil, err
		}
		msgID := convertToMessageID(msgIDResp.GetLastMessageId())
		if msgID.entryID != noMessageEntry {
			pc.startMessageID.set(msgID)

			// use the WithoutClear version because the dispatcher is not started yet
			err = pc.requestSeekWithoutClear(msgID.messageID)
			if err != nil {
				pc.Close()
				return nil, err
			}
		}
	}

	go pc.dispatcher()

	go pc.runEventsLoop()

	return pc, nil
}