protected void handleSubscribe()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java [1216:1486]


    protected void handleSubscribe(final CommandSubscribe subscribe) {
        checkArgument(state == State.Connected);
        final long requestId = subscribe.getRequestId();
        final long consumerId = subscribe.getConsumerId();
        TopicName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe);
        if (topicName == null) {
            return;
        }

        if (log.isDebugEnabled()) {
            log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}",
                remoteAddress, authRole, originalPrincipal);
        }

        final String subscriptionName = subscribe.getSubscription();
        final SubType subType = subscribe.getSubType();
        final String consumerName = subscribe.hasConsumerName() ? subscribe.getConsumerName() : "";
        final boolean isDurable = subscribe.isDurable();
        final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
                subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
                subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
                : null;
        final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
        final boolean readCompacted = subscribe.hasReadCompacted() && subscribe.isReadCompacted();
        final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
        final InitialPosition initialPosition = subscribe.getInitialPosition();
        final long startMessageRollbackDurationSec = subscribe.hasStartMessageRollbackDurationSec()
                ? subscribe.getStartMessageRollbackDurationSec()
                : -1;
        final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
        final Boolean isReplicated =
                subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null;
        final boolean forceTopicCreation = subscribe.isForceTopicCreation();
        final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
              ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
              : emptyKeySharedMeta;
        final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH;
        final Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
                subscribe.getSubscriptionPropertiesList());

        if (log.isDebugEnabled()) {
            log.debug("Topic name = {}, subscription name = {}, schema is {}", topicName, subscriptionName,
                    schema == null ? "absent" : "present");
        }

        CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
                topicName,
                subscriptionName,
                TopicOperation.CONSUME
        );

        // Make sure the consumer future is put into the consumers map first to avoid the same consumer
        // epoch using different consumer futures, and only remove the consumer future from the map
        // if subscribe failed .
        CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
        CompletableFuture<Consumer> existingConsumerFuture =
                consumers.putIfAbsent(consumerId, consumerFuture);
        isAuthorizedFuture.thenApply(isAuthorized -> {
            if (isAuthorized) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Client is authorized to subscribe with role {}",
                            remoteAddress, getPrincipal());
                }

                log.info("[{}] Subscribing on topic {} / {}. consumerId: {}, role: {}", this.toString(), topicName,
                        subscriptionName, consumerId, getPrincipal());
                try {
                    Metadata.validateMetadata(metadata,
                            service.getPulsar().getConfiguration().getMaxConsumerMetadataSize());
                } catch (IllegalArgumentException iae) {
                    final String msg = iae.getMessage();
                    consumers.remove(consumerId, consumerFuture);
                    commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
                    return null;
                }

                if (existingConsumerFuture != null) {
                    if (!existingConsumerFuture.isDone()){
                        // There was an early request to create a consumer with same consumerId. This can happen
                        // when
                        // client timeout is lower the broker timeouts. We need to wait until the previous
                        // consumer
                        // creation request either complete or fails.
                        log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
                        commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
                                "Consumer is already present on the connection");
                    } else if (existingConsumerFuture.isCompletedExceptionally()){
                        log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection,"
                                + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
                        ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
                                String.format("A failed consumer with id is already present on the connection."
                                                + " consumerId: %s, remoteAddress: %s, subscription: %s",
                                        consumerId, remoteAddress, subscriptionName));
                        /**
                         * This future may was failed due to the client closed a in-progress subscribing.
                         * See {@link #handleCloseConsumer(CommandCloseConsumer)}
                         * Do not remove the failed future at current line, it will be removed after the progress of
                         * the previous subscribing is done.
                         * Before the previous subscribing is done, the new subscribe request will always fail.
                         * This mechanism is in order to prevent more complex logic to handle the race conditions.
                         */
                        commandSender.sendErrorResponse(requestId, error,
                                "Consumer that failed is already present on the connection");
                    } else {
                        Consumer consumer = existingConsumerFuture.getNow(null);
                        log.warn("[{}] Consumer with the same id is already created:"
                                        + " consumerId={}, consumer={}",
                                remoteAddress, consumerId, consumer);
                        commandSender.sendSuccessResponse(requestId);
                    }
                    return null;
                }

                service.isAllowAutoTopicCreationAsync(topicName.toString())
                        .thenApply(isAllowed -> forceTopicCreation && isAllowed)
                        .thenCompose(createTopicIfDoesNotExist ->
                                service.getTopic(topicName.toString(), createTopicIfDoesNotExist))
                        .thenCompose(optTopic -> {
                            if (!optTopic.isPresent()) {
                                return FutureUtil
                                        .failedFuture(new TopicNotFoundException(
                                                "Topic " + topicName + " does not exist"));
                            }
                            final Topic topic = optTopic.get();
                            // Check max consumer limitation to avoid unnecessary ops wasting resources. For example:
                            // the new consumer reached max producer limitation, but pulsar did schema check first,
                            // it would waste CPU.
                            if (((AbstractTopic) topic).isConsumersExceededOnTopic()) {
                                log.warn("[{}] Attempting to add consumer to topic which reached max"
                                        + " consumers limit", topic);
                                Throwable t =
                                        new ConsumerBusyException("Topic reached max consumers limit");
                                return FutureUtil.failedFuture(t);
                            }
                            return service.isAllowAutoSubscriptionCreationAsync(topicName)
                                    .thenCompose(isAllowedAutoSubscriptionCreation -> {
                                        boolean rejectSubscriptionIfDoesNotExist = isDurable
                                                && !isAllowedAutoSubscriptionCreation
                                                && !topic.getSubscriptions().containsKey(subscriptionName)
                                                && topic.isPersistent();

                                        if (rejectSubscriptionIfDoesNotExist) {
                                            return FutureUtil
                                                    .failedFuture(
                                                            new SubscriptionNotFoundException(
                                                                    "Subscription does not exist"));
                                        }

                                        SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
                                                .subscriptionName(subscriptionName)
                                                .consumerId(consumerId).subType(subType)
                                                .priorityLevel(priorityLevel)
                                                .consumerName(consumerName).isDurable(isDurable)
                                                .startMessageId(startMessageId).metadata(metadata)
                                                .readCompacted(readCompacted)
                                                .initialPosition(initialPosition)
                                                .startMessageRollbackDurationSec(startMessageRollbackDurationSec)
                                                .replicatedSubscriptionStateArg(isReplicated)
                                                .keySharedMeta(keySharedMeta)
                                                .subscriptionProperties(subscriptionProperties)
                                                .consumerEpoch(consumerEpoch)
                                                .schemaType(schema == null ? null : schema.getType())
                                                .build();
                                        if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
                                            return ignoreUnrecoverableBKException
                                                    (topic.addSchemaIfIdleOrCheckCompatible(schema))
                                                    .thenCompose(v -> topic.subscribe(option));
                                        } else {
                                            return topic.subscribe(option);
                                        }
                                    });
                        })
                        .thenAccept(consumer -> {
                            if (consumer.checkAndApplyTopicMigration()) {
                                log.info("[{}] Disconnecting consumer {} on migrated subscription on topic {} / {}",
                                        remoteAddress, consumerId, subscriptionName, topicName);
                                consumers.remove(consumerId, consumerFuture);
                                return;
                            }

                            if (consumerFuture.complete(consumer)) {
                                log.info("[{}] Created subscription on topic {} / {}",
                                        remoteAddress, topicName, subscriptionName);
                                commandSender.sendSuccessResponse(requestId);
                                if (brokerInterceptor != null) {
                                    try {
                                        brokerInterceptor.consumerCreated(this, consumer, metadata);
                                    } catch (Throwable t) {
                                        log.error("Exception occur when intercept consumer created.", t);
                                    }
                                }
                            } else {
                                // The consumer future was completed before by a close command
                                try {
                                    consumer.close();
                                    log.info("[{}] Cleared consumer created after timeout on client side {}",
                                            remoteAddress, consumer);
                                } catch (BrokerServiceException e) {
                                    log.warn(
                                            "[{}] Error closing consumer created"
                                                    + " after timeout on client side {}: {}",
                                            remoteAddress, consumer, e.getMessage());
                                }
                                consumers.remove(consumerId, consumerFuture);
                            }

                        })
                        .exceptionally(exception -> {
                            if (exception.getCause() instanceof ConsumerBusyException) {
                                if (log.isDebugEnabled()) {
                                    log.debug(
                                            "[{}][{}][{}] Failed to create consumer because exclusive consumer"
                                                    + " is already connected: {}",
                                            remoteAddress, topicName, subscriptionName,
                                            exception.getCause().getMessage());
                                }
                            } else if (exception.getCause() instanceof BrokerServiceException.TopicMigratedException) {
                                Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar(),
                                        topicName.toString());
                                if (clusterURL.isPresent()) {
                                    log.info("[{}] redirect migrated consumer to topic {}: "
                                                    + "consumerId={}, subName={}, {}", remoteAddress,
                                            topicName, consumerId, subscriptionName, exception.getCause().getMessage());
                                    boolean msgSent = commandSender.sendTopicMigrated(ResourceType.Consumer, consumerId,
                                            clusterURL.get().getBrokerServiceUrl(),
                                            clusterURL.get().getBrokerServiceUrlTls());
                                    if (!msgSent) {
                                        log.info("consumer client doesn't support topic migration handling {}-{}-{}",
                                                topicName, remoteAddress, consumerId);
                                    }
                                    consumers.remove(consumerId, consumerFuture);
                                    closeConsumer(consumerId, Optional.empty());
                                    return null;
                                }
                            } else if (exception.getCause() instanceof BrokerServiceException) {
                                log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
                                         remoteAddress, topicName, subscriptionName,
                                         consumerId,  exception.getCause().getMessage());
                            } else {
                                log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
                                         remoteAddress, topicName, subscriptionName,
                                         consumerId, exception.getCause().getMessage(), exception);
                            }

                            // If client timed out, the future would have been completed by subsequent close.
                            // Send error back to client, only if not completed already.
                            if (consumerFuture.completeExceptionally(exception)) {
                                commandSender.sendErrorResponse(requestId,
                                        BrokerServiceException.getClientErrorCode(exception.getCause()),
                                        exception.getCause().getMessage());
                            }
                            consumers.remove(consumerId, consumerFuture);

                            return null;

                        });
            } else {
                String msg = "Client is not authorized to subscribe";
                log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
                consumers.remove(consumerId, consumerFuture);
                writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
            }
            return null;
        }).exceptionally(ex -> {
            logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex);
            consumers.remove(consumerId, consumerFuture);
            commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
            return null;
        });
    }