public CompletableFuture connectionOpened()

in pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java [1847:2052]


    public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
        previousExceptionCount.set(0);
        getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());
        setChunkMaxMessageSize();

        final long epoch;
        synchronized (this) {
            // Because the state could have been updated while retrieving the connection, we set it back to connecting,
            // as long as the change from current state to connecting is a valid state change.
            if (!changeToConnecting()) {
                if (getState() == State.Closing || getState() == State.Closed) {
                    // Producer was closed while reconnecting, close the connection to make sure the broker
                    // drops the producer on its side
                    failPendingMessages(cnx,
                            new PulsarClientException.ProducerFencedException("producer has been closed"));
                }
                return CompletableFuture.completedFuture(null);
            }
            // We set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating
            // the producer, it will try to grab a new cnx. We also increment and get the epoch value for the producer.
            epoch = connectionHandler.switchClientCnx(cnx);
        }
        cnx.registerProducer(producerId, this);

        log.info("[{}] [{}] Creating producer on cnx {}", topic, producerName, cnx.ctx().channel());

        long requestId = client.newRequestId();

        PRODUCER_DEADLINE_UPDATER
                .compareAndSet(this, 0, System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs());

        SchemaInfo schemaInfo = null;
        if (schema != null) {
            if (schema.getSchemaInfo() != null) {
                if (schema.getSchemaInfo().getType() == SchemaType.JSON) {
                    // for backwards compatibility purposes
                    // JSONSchema originally generated a schema for pojo based of of the JSON schema standard
                    // but now we have standardized on every schema to generate an Avro based schema
                    if (Commands.peerSupportJsonSchemaAvroFormat(cnx.getRemoteEndpointProtocolVersion())) {
                        schemaInfo = schema.getSchemaInfo();
                    } else if (schema instanceof JSONSchema) {
                        JSONSchema jsonSchema = (JSONSchema) schema;
                        schemaInfo = jsonSchema.getBackwardsCompatibleJsonSchemaInfo();
                    } else {
                        schemaInfo = schema.getSchemaInfo();
                    }
                } else if (schema.getSchemaInfo().getType() == SchemaType.BYTES
                        || schema.getSchemaInfo().getType() == SchemaType.NONE) {
                    // don't set schema info for Schema.BYTES
                    schemaInfo = null;
                } else {
                    schemaInfo = schema.getSchemaInfo();
                }
            }
        }

        final CompletableFuture<Void> future = new CompletableFuture<>();
        cnx.sendRequestWithId(
                Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata,
                        schemaInfo, epoch, userProvidedProducerName,
                        conf.getAccessMode(), topicEpoch, client.conf.isEnableTransaction(),
                        conf.getInitialSubscriptionName()),
                requestId).thenAccept(response -> {
            String producerName = response.getProducerName();
            long lastSequenceId = response.getLastSequenceId();
            schemaVersion = Optional.ofNullable(response.getSchemaVersion());
            schemaVersion.ifPresent(v -> schemaCache.put(SchemaHash.of(schema), v));

            // We are now reconnected to broker and clear to send messages. Re-send all pending messages and
            // set the cnx pointer so that new messages will be sent immediately
            synchronized (ProducerImpl.this) {
                State state = getState();
                if (state == State.Closing || state == State.Closed) {
                    // Producer was closed while reconnecting, close the connection to make sure the broker
                    // drops the producer on its side
                    cnx.removeProducer(producerId);
                    failPendingMessages(cnx,
                            new PulsarClientException.ProducerFencedException("producer has been closed"));
                    cnx.channel().close();
                    future.complete(null);
                    return;
                }
                resetBackoff();

                log.info("[{}] [{}] Created producer on cnx {}", topic, producerName, cnx.ctx().channel());
                connectionId = cnx.ctx().channel().toString();
                connectedSince = DateFormatter.now();
                if (conf.getAccessMode() != ProducerAccessMode.Shared && !topicEpoch.isPresent()) {
                    log.info("[{}] [{}] Producer epoch is {}", topic, producerName, response.getTopicEpoch());
                }
                topicEpoch = response.getTopicEpoch();

                if (this.producerName == null) {
                    this.producerName = producerName;
                }

                if (this.msgIdGenerator == 0 && conf.getInitialSequenceId() == null) {
                    // Only update sequence id generator if it wasn't already modified. That means we only want
                    // to update the id generator the first time the producer gets established, and ignore the
                    // sequence id sent by broker in subsequent producer reconnects
                    this.lastSequenceIdPublished = lastSequenceId;
                    this.msgIdGenerator = lastSequenceId + 1;
                }

                resendMessages(cnx, epoch);
            }
            future.complete(null);
        }).exceptionally((e) -> {
            Throwable cause = e.getCause();
            cnx.removeProducer(producerId);
            State state = getState();
            if (state == State.Closing || state == State.Closed) {
                // Producer was closed while reconnecting, close the connection to make sure the broker
                // drops the producer on its side
                cnx.channel().close();
                future.complete(null);
                return null;
            }

            if (cause instanceof TimeoutException) {
                // Creating the producer has timed out. We need to ensure the broker closes the producer
                // in case it was indeed created, otherwise it might prevent new create producer operation,
                // since we are not necessarily closing the connection.
                long closeRequestId = client.newRequestId();
                ByteBuf cmd = Commands.newCloseProducer(producerId, closeRequestId);
                cnx.sendRequestWithId(cmd, closeRequestId);
            }

            // Close the producer since topic does not exist.
            if (cause instanceof PulsarClientException.TopicDoesNotExistException) {
                closeAsync().whenComplete((v, ex) -> {
                    if (ex != null) {
                        log.error("Failed to close producer on TopicDoesNotExistException.", ex);
                    }
                    producerCreatedFuture.completeExceptionally(cause);
                });
                future.complete(null);
                return null;
            }

            if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
                synchronized (this) {
                    log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", topic,
                            producerName);

                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Pending messages: {}", topic, producerName,
                                pendingMessages.messagesCount());
                    }

                    PulsarClientException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException(
                            format("The backlog quota of the topic %s that the producer %s produces to is exceeded",
                                    topic, producerName));
                    failPendingMessages(cnx(), bqe);
                }
            } else if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededError) {
                log.warn("[{}] [{}] Producer is blocked on creation because backlog exceeded on topic.",
                        producerName, topic);
            } else if (PulsarClientException.isRetriableError(cause)) {
                log.info("[{}] [{}] Temporary error in creating producer: {}", topic, producerName, cause.getMessage());
            } else {
                log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage());
            }

            if (cause instanceof PulsarClientException.TopicTerminatedException) {
                setState(State.Terminated);
                synchronized (this) {
                    failPendingMessages(cnx(), (PulsarClientException) cause);
                }
                producerCreatedFuture.completeExceptionally(cause);
                closeProducerTasks();
                client.cleanupProducer(this);
            } else if (cause instanceof PulsarClientException.ProducerFencedException) {
                setState(State.ProducerFenced);
                synchronized (this) {
                    failPendingMessages(cnx(), (PulsarClientException) cause);
                }
                producerCreatedFuture.completeExceptionally(cause);
                closeProducerTasks();
                client.cleanupProducer(this);
            } else if (producerCreatedFuture.isDone() || (
                    cause instanceof PulsarClientException
                            && PulsarClientException.isRetriableError(cause)
                            && System.currentTimeMillis() < PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this)
            )) {
                // Either we had already created the producer once (producerCreatedFuture.isDone()) or we are
                // still within the initial timeout budget and we are dealing with a retriable error
                future.completeExceptionally(cause);
            } else {
                setState(State.Failed);
                producerCreatedFuture.completeExceptionally(cause);
                closeProducerTasks();
                client.cleanupProducer(this);
                Timeout timeout = sendTimeout;
                if (timeout != null) {
                    timeout.cancel();
                    sendTimeout = null;
                }
            }
            if (!future.isDone()) {
                future.complete(null);
            }
            return null;
        });
        return future;
    }