private CompletableFuture internalSubscribe()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java [911:1083]


    private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
                                                          long consumerId, SubType subType, int priorityLevel,
                                                          String consumerName, boolean isDurable,
                                                          MessageId startMessageId,
                                                          Map<String, String> metadata, boolean readCompacted,
                                                          InitialPosition initialPosition,
                                                          long startMessageRollbackDurationSec,
                                                          Boolean replicatedSubscriptionStateArg,
                                                          KeySharedMeta keySharedMeta,
                                                          Map<String, String> subscriptionProperties,
                                                          long consumerEpoch,
                                                          SchemaType schemaType) {
        if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
            return FutureUtil.failedFuture(new NotAllowedException(
                    "readCompacted only allowed on failover or exclusive subscriptions"));
        }

        return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
            Boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;
            if (replicatedSubscriptionState != null && replicatedSubscriptionState
                    && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
                log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
                replicatedSubscriptionState = false;
            }

            if (subType == SubType.Key_Shared
                    && !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) {
                return FutureUtil.failedFuture(
                        new NotAllowedException("Key_Shared subscription is disabled by broker."));
            }

            try {
                if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
                        && !checkSubscriptionTypesEnable(subType)) {
                    return FutureUtil.failedFuture(
                            new NotAllowedException("Topic[{" + topic + "}] doesn't support "
                                    + subType.name() + " sub type!"));
                }
            } catch (Exception e) {
                return FutureUtil.failedFuture(e);
            }

            if (isBlank(subscriptionName)) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Empty subscription name", topic);
                }
                return FutureUtil.failedFuture(new NamingException("Empty subscription name"));
            }

            if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName);
                }
                return FutureUtil.failedFuture(
                        new UnsupportedVersionException("Consumer doesn't support batch-message"));
            }

            if (subscriptionName.startsWith(replicatorPrefix)
                    || subscriptionName.equals(DEDUPLICATION_CURSOR_NAME)) {
                log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName);
                return FutureUtil.failedFuture(
                        new NamingException("Subscription with reserved subscription name attempted"));
            }

            if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")
                    && subscribeRateLimiter.isPresent()) {
                SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(
                        cnx.clientAddress().toString().split(":")[0], consumerName, consumerId);
                if (!subscribeRateLimiter.get().subscribeAvailable(consumer)
                        || !subscribeRateLimiter.get().tryAcquire(consumer)) {
                    log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}",
                            topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(),
                            subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumer));
                    return FutureUtil.failedFuture(
                            new NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
                }
            }

            lock.readLock().lock();
            try {
                if (isFenced) {
                    log.warn("[{}] Attempting to subscribe to a fenced topic", topic);
                    return FutureUtil.failedFuture(new TopicFencedException("Topic is temporarily unavailable"));
                }
                handleConsumerAdded(subscriptionName, consumerName);
            } finally {
                lock.readLock().unlock();
            }

            CompletableFuture<? extends Subscription> subscriptionFuture = isDurable
                    ? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
                    replicatedSubscriptionState, subscriptionProperties)
                    : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
                    startMessageRollbackDurationSec, readCompacted, subscriptionProperties);

            CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
                Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
                        consumerName, isDurable, cnx, cnx.getAuthRole(), metadata,
                        readCompacted, keySharedMeta, startMessageId, consumerEpoch, schemaType);

                return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
                    if (subscription instanceof PersistentSubscription persistentSubscription) {
                        checkBackloggedCursor(persistentSubscription);
                    }
                    if (!cnx.isActive()) {
                        try {
                            consumer.close();
                        } catch (BrokerServiceException e) {
                            if (e instanceof ConsumerBusyException) {
                                log.warn("[{}][{}] Consumer {} {} already connected: {}",
                                        topic, subscriptionName, consumerId, consumerName, e.getMessage());
                            } else if (e instanceof SubscriptionBusyException) {
                                log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage());
                            }

                            decrementUsageCount();
                            return FutureUtil.failedFuture(e);
                        }
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,
                                    consumer.consumerName(), currentUsageCount());
                        }

                        decrementUsageCount();
                        return FutureUtil.failedFuture(
                                new BrokerServiceException.ConnectionClosedException(
                                        "Connection was closed while the opening the cursor "));
                    } else {
                        checkReplicatedSubscriptionControllerState();
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
                        }
                        return CompletableFuture.completedFuture(consumer);
                    }
                });
            });

            future.exceptionally(ex -> {
                decrementUsageCount();

                if (ex.getCause() instanceof ConsumerBusyException) {
                    log.warn("[{}][{}] Consumer {} {} already connected: {}", topic, subscriptionName, consumerId,
                            consumerName, ex.getCause().getMessage());
                    Consumer consumer = null;
                    try {
                        consumer = subscriptionFuture.isDone() ? getActiveConsumer(subscriptionFuture.get()) : null;
                        // cleanup consumer if connection is already closed
                        if (consumer != null && !consumer.cnx().isActive()) {
                            consumer.close();
                        }
                    } catch (Exception be) {
                        log.error("Failed to clean up consumer on closed connection {}, {}", consumer, be.getMessage());
                    }
                } else if (ex.getCause() instanceof SubscriptionBusyException) {
                    log.warn("[{}][{}] {}", topic, subscriptionName, ex.getMessage());
                } else if (ex.getCause() instanceof BrokerServiceException.SubscriptionFencedException
                        && isCompactionSubscription(subscriptionName)) {
                    log.warn("[{}] Failed to create compaction subscription: {}", topic, ex.getMessage());
                } else if (ex.getCause() instanceof ManagedLedgerFencedException) {
                    // If the topic has been fenced, we cannot continue using it. We need to close and reopen
                    log.warn("[{}][{}] has been fenced. closing the topic {}", topic, subscriptionName,
                            ex.getMessage());
                    close();
                } else if (ex.getCause() instanceof BrokerServiceException.ConnectionClosedException) {
                    log.warn("[{}][{}] Connection was closed while the opening the cursor", topic, subscriptionName);
                } else {
                    log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex);
                }
                return null;
            });
            return future;
        });
    }