protected ConsumerImpl()

in pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java [287:445]


    protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
                           ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer,
                           boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture,
                           MessageId startMessageId,
                           long startMessageRollbackDurationInSec, Schema<T> schema,
                           ConsumerInterceptors<T> interceptors,
                           boolean createTopicIfDoesNotExist) {
        super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema,
                interceptors);
        this.consumerId = client.newConsumerId();

        TopicName topicName = TopicName.get(topic);
        if (!topicName.isPersistent() && conf.getSubscriptionMode().equals(SubscriptionMode.Durable)) {
            conf.setSubscriptionMode(SubscriptionMode.NonDurable);
            log.warn("[{}] Cannot create a [Durable] subscription for a NonPersistentTopic, "
                    + "will use [NonDurable] to subscribe. Subscription name: {}", topic, conf.getSubscriptionName());
        }
        this.subscriptionMode = conf.getSubscriptionMode();
        if (startMessageId != null) {
            MessageIdAdv firstChunkMessageId = ((MessageIdAdv) startMessageId).getFirstChunkMessageId();
            if (conf.isResetIncludeHead() && firstChunkMessageId != null) {
                // The chunk message id's ledger id and entry id are the last chunk's ledger id and entry id, when
                // startMessageIdInclusive() is enabled, we need to start from the first chunk's message id
                this.startMessageId = firstChunkMessageId;
            } else {
                this.startMessageId = (MessageIdAdv) startMessageId;
            }
        }
        this.initialStartMessageId = this.startMessageId;
        this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
        AVAILABLE_PERMITS_UPDATER.set(this, 0);
        this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs();
        this.partitionIndex = partitionIndex;
        this.hasParentConsumer = hasParentConsumer;
        this.parentConsumerHasListener = parentConsumerHasListener;
        this.priorityLevel = conf.getMatchingTopicConfiguration(topic).getPriorityLevel();
        this.readCompacted = conf.isReadCompacted();
        this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
        this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
        this.resetIncludeHead = conf.isResetIncludeHead();
        this.createTopicIfDoesNotExist = createTopicIfDoesNotExist;
        this.maxPendingChunkedMessage = conf.getMaxPendingChunkedMessage();
        this.pendingChunkedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
        this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
        this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
        this.poolMessages = conf.isPoolMessages();
        this.paused = conf.isStartPaused();

        if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
            stats = new ConsumerStatsRecorderImpl(client, conf, this);
        } else {
            stats = ConsumerStatsDisabled.INSTANCE;
        }

        seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED);

        // Create msgCrypto if not created already
        if (conf.getCryptoKeyReader() != null) {
            if (conf.getMessageCrypto() != null) {
                this.msgCrypto = conf.getMessageCrypto();
            } else {
                // default to use MessageCryptoBc;
                MessageCrypto msgCryptoBc;
                try {
                    msgCryptoBc = new MessageCryptoBc(
                            String.format("[%s] [%s]", topic, subscription),
                            false);
                } catch (Exception e) {
                    log.error("MessageCryptoBc may not included in the jar. e:", e);
                    msgCryptoBc = null;
                }
                this.msgCrypto = msgCryptoBc;
            }
        } else {
            this.msgCrypto = null;
        }

        if (conf.getProperties().isEmpty()) {
            metadata = Collections.emptyMap();
        } else {
            metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
        }

        this.connectionHandler = new ConnectionHandler(this,
                new BackoffBuilder()
                        .setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
                                TimeUnit.NANOSECONDS)
                        .setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
                        .setMandatoryStop(0, TimeUnit.MILLISECONDS)
                        .create(),
                this);

        this.topicName = TopicName.get(topic);
        if (this.topicName.isPersistent()) {
            this.acknowledgmentsGroupingTracker =
                    new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
        } else {
            this.acknowledgmentsGroupingTracker =
                    NonPersistentAcknowledgmentGroupingTracker.of();
        }

        if (conf.getDeadLetterPolicy() != null) {
            possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap<>();
            if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
                this.deadLetterPolicy = DeadLetterPolicy.builder()
                        .maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
                        .deadLetterTopic(conf.getDeadLetterPolicy().getDeadLetterTopic())
                        .build();
            } else {
                this.deadLetterPolicy = DeadLetterPolicy.builder()
                        .maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
                        .deadLetterTopic(RetryMessageUtil.getDLQTopic(topic, subscription))
                        .build();
            }
            if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
                this.deadLetterPolicy.setRetryLetterTopic(conf.getDeadLetterPolicy().getRetryLetterTopic());
            } else {
                this.deadLetterPolicy.setRetryLetterTopic(RetryMessageUtil.getRetryTopic(topic, subscription));
            }
            if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getInitialSubscriptionName())) {
                this.deadLetterPolicy.setInitialSubscriptionName(
                        conf.getDeadLetterPolicy().getInitialSubscriptionName());
            }
            this.deadLetterPolicy.setRetryLetterProducerBuilderCustomizer(
                    conf.getDeadLetterPolicy().getRetryLetterProducerBuilderCustomizer());
            this.deadLetterPolicy.setDeadLetterProducerBuilderCustomizer(
                    conf.getDeadLetterPolicy().getDeadLetterProducerBuilderCustomizer());
        } else {
            deadLetterPolicy = null;
            possibleSendToDeadLetterTopicMessages = null;
        }

        topicNameWithoutPartition = topicName.getPartitionedTopicName();

        InstrumentProvider ip = client.instrumentProvider();
        Attributes attrs = Attributes.builder().put("pulsar.subscription", subscription).build();
        consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
                "The number of consumer sessions opened", topic, attrs);
        consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
                "The number of consumer sessions closed", topic, attrs);
        messagesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.count", Unit.Messages,
                "The number of messages explicitly received by the consumer application", topic, attrs);
        bytesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.size", Unit.Bytes,
                "The number of bytes explicitly received by the consumer application", topic, attrs);
        messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.count", Unit.Messages,
                "The number of messages currently sitting in the consumer receive queue", topic, attrs);
        bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.size", Unit.Bytes,
                "The total size in bytes of messages currently sitting in the consumer receive queue", topic, attrs);

        consumerAcksCounter = ip.newCounter("pulsar.client.consumer.message.ack", Unit.Messages,
                "The number of acknowledged messages", topic, attrs);
        consumerNacksCounter = ip.newCounter("pulsar.client.consumer.message.nack", Unit.Messages,
                "The number of negatively acknowledged messages", topic, attrs);
        consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages,
                "The number of messages sent to DLQ", topic, attrs);
        grabCnx();

        consumersOpenedCounter.increment();
    }