in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java [911:1083]
private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
long consumerId, SubType subType, int priorityLevel,
String consumerName, boolean isDurable,
MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
Boolean replicatedSubscriptionStateArg,
KeySharedMeta keySharedMeta,
Map<String, String> subscriptionProperties,
long consumerEpoch,
SchemaType schemaType) {
if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
return FutureUtil.failedFuture(new NotAllowedException(
"readCompacted only allowed on failover or exclusive subscriptions"));
}
return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
Boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;
if (replicatedSubscriptionState != null && replicatedSubscriptionState
&& !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
replicatedSubscriptionState = false;
}
if (subType == SubType.Key_Shared
&& !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) {
return FutureUtil.failedFuture(
new NotAllowedException("Key_Shared subscription is disabled by broker."));
}
try {
if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
&& !checkSubscriptionTypesEnable(subType)) {
return FutureUtil.failedFuture(
new NotAllowedException("Topic[{" + topic + "}] doesn't support "
+ subType.name() + " sub type!"));
}
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}
if (isBlank(subscriptionName)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Empty subscription name", topic);
}
return FutureUtil.failedFuture(new NamingException("Empty subscription name"));
}
if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName);
}
return FutureUtil.failedFuture(
new UnsupportedVersionException("Consumer doesn't support batch-message"));
}
if (subscriptionName.startsWith(replicatorPrefix)
|| subscriptionName.equals(DEDUPLICATION_CURSOR_NAME)) {
log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName);
return FutureUtil.failedFuture(
new NamingException("Subscription with reserved subscription name attempted"));
}
if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")
&& subscribeRateLimiter.isPresent()) {
SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(
cnx.clientAddress().toString().split(":")[0], consumerName, consumerId);
if (!subscribeRateLimiter.get().subscribeAvailable(consumer)
|| !subscribeRateLimiter.get().tryAcquire(consumer)) {
log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}",
topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(),
subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumer));
return FutureUtil.failedFuture(
new NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
}
}
lock.readLock().lock();
try {
if (isFenced) {
log.warn("[{}] Attempting to subscribe to a fenced topic", topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic is temporarily unavailable"));
}
handleConsumerAdded(subscriptionName, consumerName);
} finally {
lock.readLock().unlock();
}
CompletableFuture<? extends Subscription> subscriptionFuture = isDurable
? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState, subscriptionProperties)
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec, readCompacted, subscriptionProperties);
CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
consumerName, isDurable, cnx, cnx.getAuthRole(), metadata,
readCompacted, keySharedMeta, startMessageId, consumerEpoch, schemaType);
return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
if (subscription instanceof PersistentSubscription persistentSubscription) {
checkBackloggedCursor(persistentSubscription);
}
if (!cnx.isActive()) {
try {
consumer.close();
} catch (BrokerServiceException e) {
if (e instanceof ConsumerBusyException) {
log.warn("[{}][{}] Consumer {} {} already connected: {}",
topic, subscriptionName, consumerId, consumerName, e.getMessage());
} else if (e instanceof SubscriptionBusyException) {
log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage());
}
decrementUsageCount();
return FutureUtil.failedFuture(e);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,
consumer.consumerName(), currentUsageCount());
}
decrementUsageCount();
return FutureUtil.failedFuture(
new BrokerServiceException.ConnectionClosedException(
"Connection was closed while the opening the cursor "));
} else {
checkReplicatedSubscriptionControllerState();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
}
return CompletableFuture.completedFuture(consumer);
}
});
});
future.exceptionally(ex -> {
decrementUsageCount();
if (ex.getCause() instanceof ConsumerBusyException) {
log.warn("[{}][{}] Consumer {} {} already connected: {}", topic, subscriptionName, consumerId,
consumerName, ex.getCause().getMessage());
Consumer consumer = null;
try {
consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
// cleanup consumer if connection is already closed
if (consumer != null && !consumer.cnx().isActive()) {
consumer.close();
}
} catch (Exception be) {
log.error("Failed to clean up consumer on closed connection {}, {}", consumer, be.getMessage());
}
} else if (ex.getCause() instanceof SubscriptionBusyException) {
log.warn("[{}][{}] {}", topic, subscriptionName, ex.getMessage());
} else if (ex.getCause() instanceof BrokerServiceException.SubscriptionFencedException
&& isCompactionSubscription(subscriptionName)) {
log.warn("[{}] Failed to create compaction subscription: {}", topic, ex.getMessage());
} else if (ex.getCause() instanceof ManagedLedgerFencedException) {
// If the topic has been fenced, we cannot continue using it. We need to close and reopen
log.warn("[{}][{}] has been fenced. closing the topic {}", topic, subscriptionName,
ex.getMessage());
close();
} else if (ex.getCause() instanceof BrokerServiceException.ConnectionClosedException) {
log.warn("[{}][{}] Connection was closed while the opening the cursor", topic, subscriptionName);
} else {
log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex);
}
return null;
});
return future;
});
}