protected void handleProducer()

in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java [1504:1760]


    protected void handleProducer(final CommandProducer cmdProducer) {
        checkArgument(state == State.Connected);
        final long producerId = cmdProducer.getProducerId();
        final long requestId = cmdProducer.getRequestId();
        // Use producer name provided by client if present
        final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
                : service.generateUniqueProducerName();
        final long epoch = cmdProducer.getEpoch();
        final boolean userProvidedProducerName = cmdProducer.isUserProvidedProducerName();
        final boolean isEncrypted = cmdProducer.isEncrypted();
        final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
        final SchemaData schema = cmdProducer.hasSchema() ? getSchema(cmdProducer.getSchema()) : null;

        final ProducerAccessMode producerAccessMode = cmdProducer.getProducerAccessMode();
        final Optional<Long> topicEpoch = cmdProducer.hasTopicEpoch()
                ? Optional.of(cmdProducer.getTopicEpoch()) : Optional.empty();
        final boolean isTxnEnabled = cmdProducer.isTxnEnabled();
        final String initialSubscriptionName =
                cmdProducer.hasInitialSubscriptionName() ? cmdProducer.getInitialSubscriptionName() : null;
        final boolean supportsPartialProducer = supportsPartialProducer();

        final TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
        if (topicName == null) {
            return;
        }

        CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
                topicName, TopicOperation.PRODUCE, authenticationData, originalAuthData
        );

        if (!Strings.isNullOrEmpty(initialSubscriptionName)) {
            isAuthorizedFuture =
                    isAuthorizedFuture.thenCombine(
                            isTopicOperationAllowed(topicName, initialSubscriptionName, TopicOperation.SUBSCRIBE),
                            (canProduce, canSubscribe) -> canProduce && canSubscribe);
        }

        isAuthorizedFuture.thenApply(isAuthorized -> {
            if (!isAuthorized) {
                String msg = "Client is not authorized to Produce";
                log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
                writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
                return null;
            }

            if (log.isDebugEnabled()) {
                log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, getPrincipal());
            }
            CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
            CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, producerFuture);

            if (existingProducerFuture != null) {
                if (!existingProducerFuture.isDone()) {
                    // There was an early request to create a producer with same producerId.
                    // This can happen when client timeout is lower than the broker timeouts.
                    // We need to wait until the previous producer creation request
                    // either complete or fails.
                    log.warn("[{}][{}] Producer with id is already present on the connection, producerId={}",
                            remoteAddress, topicName, producerId);
                    commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
                            "Producer is already present on the connection");
                } else if (existingProducerFuture.isCompletedExceptionally()) {
                    // remove producer with producerId as it's already completed with exception
                    log.warn("[{}][{}] Producer with id is failed to register present on the connection, producerId={}",
                            remoteAddress, topicName, producerId);
                    ServerError error = getErrorCode(existingProducerFuture);
                    producers.remove(producerId, existingProducerFuture);
                    commandSender.sendErrorResponse(requestId, error,
                            "Producer is already failed to register present on the connection");
                } else {
                    Producer producer = existingProducerFuture.getNow(null);
                    log.info("[{}] [{}] Producer with the same id is already created:"
                            + " producerId={}, producer={}", remoteAddress, topicName, producerId, producer);
                    commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(),
                            producer.getSchemaVersion());
                }
                return null;
            }

            if (log.isDebugEnabled()) {
                log.debug("[{}][{}] Creating producer. producerId={}, producerName={}, schema is {}", remoteAddress,
                        topicName, producerId, producerName, schema == null ? "absent" : "present");
            }

            service.getOrCreateTopic(topicName.toString()).thenCompose((Topic topic) -> {
                // Check max producer limitation to avoid unnecessary ops wasting resources. For example: the new
                // producer reached max producer limitation, but pulsar did schema check first, it would waste CPU
                if (((AbstractTopic) topic).isProducersExceeded(producerName)) {
                    log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
                    String errorMsg = "Topic '" + topicName.toString() + "' reached max producers limit";
                    Throwable t = new BrokerServiceException.ProducerBusyException(errorMsg);
                    return CompletableFuture.failedFuture(t);
                }

                // Before creating producer, check if backlog quota exceeded
                // on topic for size based limit and time based limit
                CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
                        topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));

                backlogQuotaCheckFuture.thenRun(() -> {
                    // Check whether the producer will publish encrypted messages or not
                    if ((topic.isEncryptionRequired() || encryptionRequireOnProducer)
                            && !isEncrypted
                            && !SystemTopicNames.isSystemTopic(topicName)) {
                        String msg = String.format("Encryption is required in %s", topicName);
                        log.warn("[{}] {}", remoteAddress, msg);
                        if (producerFuture.completeExceptionally(new ServerMetadataException(msg))) {
                            commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
                        }
                        producers.remove(producerId, producerFuture);
                        return;
                    }

                    disableTcpNoDelayIfNeeded(topicName.toString(), producerName);

                    CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);

                    schemaVersionFuture.exceptionally(exception -> {
                        if (producerFuture.completeExceptionally(exception)) {
                            String message = exception.getMessage();
                            if (exception.getCause() != null) {
                                message += (" caused by " + exception.getCause());
                            }
                            commandSender.sendErrorResponse(requestId,
                                    BrokerServiceException.getClientErrorCode(exception),
                                    message);
                        }
                        var cause = FutureUtil.unwrapCompletionException(exception);
                        if (!(cause instanceof IncompatibleSchemaException)) {
                            log.error("Try add schema failed, remote address {}, topic {}, producerId {}",
                                    remoteAddress,
                                    topicName, producerId, exception);
                        }
                        producers.remove(producerId, producerFuture);
                        return null;
                    });

                    schemaVersionFuture.thenAccept(schemaVersion -> {
                        CompletionStage<Subscription> createInitSubFuture;
                        if (!Strings.isNullOrEmpty(initialSubscriptionName)
                                && topic.isPersistent()
                                && !topic.getSubscriptions().containsKey(initialSubscriptionName)) {
                            createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName)
                                    .thenCompose(isAllowAutoSubscriptionCreation -> {
                                        if (!isAllowAutoSubscriptionCreation) {
                                            return CompletableFuture.failedFuture(
                                                    new BrokerServiceException.NotAllowedException(
                                                            "Could not create the initial subscription due to the "
                                                                    + "auto subscription creation is not allowed."));
                                        }
                                        return topic.createSubscription(initialSubscriptionName,
                                                InitialPosition.Earliest, false, null);
                                    });
                        } else {
                            createInitSubFuture = CompletableFuture.completedFuture(null);
                        }

                        createInitSubFuture.whenComplete((sub, ex) -> {
                            if (ex != null) {
                                final Throwable rc = FutureUtil.unwrapCompletionException(ex);
                                if (rc instanceof BrokerServiceException.NotAllowedException) {
                                    log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
                                            remoteAddress, rc.getMessage(), initialSubscriptionName, topicName);
                                    if (producerFuture.completeExceptionally(rc)) {
                                        commandSender.sendErrorResponse(requestId,
                                                ServerError.NotAllowedError, rc.getMessage());
                                    }
                                    producers.remove(producerId, producerFuture);
                                    return;
                                }
                                String msg =
                                        "Failed to create the initial subscription: " + ex.getCause().getMessage();
                                log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
                                        remoteAddress, msg, initialSubscriptionName, topicName);
                                if (producerFuture.completeExceptionally(ex)) {
                                    commandSender.sendErrorResponse(requestId,
                                            BrokerServiceException.getClientErrorCode(ex), msg);
                                }
                                producers.remove(producerId, producerFuture);
                                return;
                            }

                            buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted,
                                    metadata, schemaVersion, epoch, userProvidedProducerName, topicName,
                                    producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture);
                        });
                    });
                });
                return backlogQuotaCheckFuture;
            }).exceptionally(exception -> {
                Throwable cause = exception.getCause();
                if (cause instanceof BrokerServiceException.TopicBacklogQuotaExceededException) {
                    BrokerServiceException.TopicBacklogQuotaExceededException tbqe =
                            (BrokerServiceException.TopicBacklogQuotaExceededException) cause;
                    IllegalStateException illegalStateException = new IllegalStateException(tbqe);
                    BacklogQuota.RetentionPolicy retentionPolicy = tbqe.getRetentionPolicy();
                    if (producerFuture.completeExceptionally(illegalStateException)) {
                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
                            commandSender.sendErrorResponse(requestId,
                                    ServerError.ProducerBlockedQuotaExceededError,
                                    illegalStateException.getMessage());
                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
                            commandSender.sendErrorResponse(requestId,
                                    ServerError.ProducerBlockedQuotaExceededException,
                                    illegalStateException.getMessage());
                        }
                    }
                    producers.remove(producerId, producerFuture);
                    return null;
                } else if (cause instanceof BrokerServiceException.TopicMigratedException) {
                    Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar(), topicName.toString());
                    if (clusterURL.isPresent()) {
                        log.info("[{}] redirect migrated producer to topic {}: "
                                        + "producerId={}, producerName = {}, {}", remoteAddress,
                                topicName, producerId, producerName, cause.getMessage());
                        boolean msgSent = commandSender.sendTopicMigrated(ResourceType.Producer, producerId,
                                clusterURL.get().getBrokerServiceUrl(), clusterURL.get().getBrokerServiceUrlTls());
                        if (!msgSent) {
                            log.info("client doesn't support topic migration handling {}-{}-{}", topicName,
                                    remoteAddress, producerId);
                        }
                        producers.remove(producerId, producerFuture);
                        closeProducer(producerId, -1L, Optional.empty());
                        return null;
                    }
                }

                // Do not print stack traces for expected exceptions
                if (cause instanceof NoSuchElementException) {
                    cause = new TopicNotFoundException(String.format("Topic not found %s", topicName.toString()));
                    log.warn("[{}] Failed to load topic {}, producerId={}: Topic not found", remoteAddress, topicName,
                            producerId);
                } else if (!Exceptions.areExceptionsPresentInChain(cause,
                        ServiceUnitNotReadyException.class, ManagedLedgerException.class,
                        BrokerServiceException.ProducerBusyException.class)) {
                    log.error("[{}] Failed to create topic {}, producerId={}",
                            remoteAddress, topicName, producerId, exception);
                }

                // If client timed out, the future would have been completed
                // by subsequent close. Send error back to
                // client, only if not completed already.
                if (producerFuture.completeExceptionally(exception)) {
                    commandSender.sendErrorResponse(requestId,
                            BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
                }
                producers.remove(producerId, producerFuture);
                return null;
            });
            return null;
        }).exceptionally(ex -> {
            logAuthException(remoteAddress, "producer", getPrincipal(), Optional.of(topicName), ex);
            commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
            return null;
        });
    }