in pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java [287:445]
protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer,
boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture,
MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema,
interceptors);
this.consumerId = client.newConsumerId();
TopicName topicName = TopicName.get(topic);
if (!topicName.isPersistent() && conf.getSubscriptionMode().equals(SubscriptionMode.Durable)) {
conf.setSubscriptionMode(SubscriptionMode.NonDurable);
log.warn("[{}] Cannot create a [Durable] subscription for a NonPersistentTopic, "
+ "will use [NonDurable] to subscribe. Subscription name: {}", topic, conf.getSubscriptionName());
}
this.subscriptionMode = conf.getSubscriptionMode();
if (startMessageId != null) {
MessageIdAdv firstChunkMessageId = ((MessageIdAdv) startMessageId).getFirstChunkMessageId();
if (conf.isResetIncludeHead() && firstChunkMessageId != null) {
// The chunk message id's ledger id and entry id are the last chunk's ledger id and entry id, when
// startMessageIdInclusive() is enabled, we need to start from the first chunk's message id
this.startMessageId = firstChunkMessageId;
} else {
this.startMessageId = (MessageIdAdv) startMessageId;
}
}
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
AVAILABLE_PERMITS_UPDATER.set(this, 0);
this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs();
this.partitionIndex = partitionIndex;
this.hasParentConsumer = hasParentConsumer;
this.parentConsumerHasListener = parentConsumerHasListener;
this.priorityLevel = conf.getMatchingTopicConfiguration(topic).getPriorityLevel();
this.readCompacted = conf.isReadCompacted();
this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
this.resetIncludeHead = conf.isResetIncludeHead();
this.createTopicIfDoesNotExist = createTopicIfDoesNotExist;
this.maxPendingChunkedMessage = conf.getMaxPendingChunkedMessage();
this.pendingChunkedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
this.poolMessages = conf.isPoolMessages();
this.paused = conf.isStartPaused();
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
} else {
stats = ConsumerStatsDisabled.INSTANCE;
}
seekStatus = new AtomicReference<>(SeekStatus.NOT_STARTED);
// Create msgCrypto if not created already
if (conf.getCryptoKeyReader() != null) {
if (conf.getMessageCrypto() != null) {
this.msgCrypto = conf.getMessageCrypto();
} else {
// default to use MessageCryptoBc;
MessageCrypto msgCryptoBc;
try {
msgCryptoBc = new MessageCryptoBc(
String.format("[%s] [%s]", topic, subscription),
false);
} catch (Exception e) {
log.error("MessageCryptoBc may not included in the jar. e:", e);
msgCryptoBc = null;
}
this.msgCrypto = msgCryptoBc;
}
} else {
this.msgCrypto = null;
}
if (conf.getProperties().isEmpty()) {
metadata = Collections.emptyMap();
} else {
metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
}
this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create(),
this);
this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
this.acknowledgmentsGroupingTracker =
new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
} else {
this.acknowledgmentsGroupingTracker =
NonPersistentAcknowledgmentGroupingTracker.of();
}
if (conf.getDeadLetterPolicy() != null) {
possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap<>();
if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
this.deadLetterPolicy = DeadLetterPolicy.builder()
.maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
.deadLetterTopic(conf.getDeadLetterPolicy().getDeadLetterTopic())
.build();
} else {
this.deadLetterPolicy = DeadLetterPolicy.builder()
.maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
.deadLetterTopic(RetryMessageUtil.getDLQTopic(topic, subscription))
.build();
}
if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
this.deadLetterPolicy.setRetryLetterTopic(conf.getDeadLetterPolicy().getRetryLetterTopic());
} else {
this.deadLetterPolicy.setRetryLetterTopic(RetryMessageUtil.getRetryTopic(topic, subscription));
}
if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getInitialSubscriptionName())) {
this.deadLetterPolicy.setInitialSubscriptionName(
conf.getDeadLetterPolicy().getInitialSubscriptionName());
}
this.deadLetterPolicy.setRetryLetterProducerBuilderCustomizer(
conf.getDeadLetterPolicy().getRetryLetterProducerBuilderCustomizer());
this.deadLetterPolicy.setDeadLetterProducerBuilderCustomizer(
conf.getDeadLetterPolicy().getDeadLetterProducerBuilderCustomizer());
} else {
deadLetterPolicy = null;
possibleSendToDeadLetterTopicMessages = null;
}
topicNameWithoutPartition = topicName.getPartitionedTopicName();
InstrumentProvider ip = client.instrumentProvider();
Attributes attrs = Attributes.builder().put("pulsar.subscription", subscription).build();
consumersOpenedCounter = ip.newCounter("pulsar.client.consumer.opened", Unit.Sessions,
"The number of consumer sessions opened", topic, attrs);
consumersClosedCounter = ip.newCounter("pulsar.client.consumer.closed", Unit.Sessions,
"The number of consumer sessions closed", topic, attrs);
messagesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.count", Unit.Messages,
"The number of messages explicitly received by the consumer application", topic, attrs);
bytesReceivedCounter = ip.newCounter("pulsar.client.consumer.message.received.size", Unit.Bytes,
"The number of bytes explicitly received by the consumer application", topic, attrs);
messagesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.count", Unit.Messages,
"The number of messages currently sitting in the consumer receive queue", topic, attrs);
bytesPrefetchedGauge = ip.newUpDownCounter("pulsar.client.consumer.receive_queue.size", Unit.Bytes,
"The total size in bytes of messages currently sitting in the consumer receive queue", topic, attrs);
consumerAcksCounter = ip.newCounter("pulsar.client.consumer.message.ack", Unit.Messages,
"The number of acknowledged messages", topic, attrs);
consumerNacksCounter = ip.newCounter("pulsar.client.consumer.message.nack", Unit.Messages,
"The number of negatively acknowledged messages", topic, attrs);
consumerDlqMessagesCounter = ip.newCounter("pulsar.client.consumer.message.dlq", Unit.Messages,
"The number of messages sent to DLQ", topic, attrs);
grabCnx();
consumersOpenedCounter.increment();
}