in pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java [1847:2052]
public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptionCount.set(0);
getConnectionHandler().setMaxMessageSize(cnx.getMaxMessageSize());
setChunkMaxMessageSize();
final long epoch;
synchronized (this) {
// Because the state could have been updated while retrieving the connection, we set it back to connecting,
// as long as the change from current state to connecting is a valid state change.
if (!changeToConnecting()) {
if (getState() == State.Closing || getState() == State.Closed) {
// Producer was closed while reconnecting, close the connection to make sure the broker
// drops the producer on its side
failPendingMessages(cnx,
new PulsarClientException.ProducerFencedException("producer has been closed"));
}
return CompletableFuture.completedFuture(null);
}
// We set the cnx reference before registering the producer on the cnx, so if the cnx breaks before creating
// the producer, it will try to grab a new cnx. We also increment and get the epoch value for the producer.
epoch = connectionHandler.switchClientCnx(cnx);
}
cnx.registerProducer(producerId, this);
log.info("[{}] [{}] Creating producer on cnx {}", topic, producerName, cnx.ctx().channel());
long requestId = client.newRequestId();
PRODUCER_DEADLINE_UPDATER
.compareAndSet(this, 0, System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs());
SchemaInfo schemaInfo = null;
if (schema != null) {
if (schema.getSchemaInfo() != null) {
if (schema.getSchemaInfo().getType() == SchemaType.JSON) {
// for backwards compatibility purposes
// JSONSchema originally generated a schema for pojo based of of the JSON schema standard
// but now we have standardized on every schema to generate an Avro based schema
if (Commands.peerSupportJsonSchemaAvroFormat(cnx.getRemoteEndpointProtocolVersion())) {
schemaInfo = schema.getSchemaInfo();
} else if (schema instanceof JSONSchema) {
JSONSchema jsonSchema = (JSONSchema) schema;
schemaInfo = jsonSchema.getBackwardsCompatibleJsonSchemaInfo();
} else {
schemaInfo = schema.getSchemaInfo();
}
} else if (schema.getSchemaInfo().getType() == SchemaType.BYTES
|| schema.getSchemaInfo().getType() == SchemaType.NONE) {
// don't set schema info for Schema.BYTES
schemaInfo = null;
} else {
schemaInfo = schema.getSchemaInfo();
}
}
}
final CompletableFuture<Void> future = new CompletableFuture<>();
cnx.sendRequestWithId(
Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata,
schemaInfo, epoch, userProvidedProducerName,
conf.getAccessMode(), topicEpoch, client.conf.isEnableTransaction(),
conf.getInitialSubscriptionName()),
requestId).thenAccept(response -> {
String producerName = response.getProducerName();
long lastSequenceId = response.getLastSequenceId();
schemaVersion = Optional.ofNullable(response.getSchemaVersion());
schemaVersion.ifPresent(v -> schemaCache.put(SchemaHash.of(schema), v));
// We are now reconnected to broker and clear to send messages. Re-send all pending messages and
// set the cnx pointer so that new messages will be sent immediately
synchronized (ProducerImpl.this) {
State state = getState();
if (state == State.Closing || state == State.Closed) {
// Producer was closed while reconnecting, close the connection to make sure the broker
// drops the producer on its side
cnx.removeProducer(producerId);
failPendingMessages(cnx,
new PulsarClientException.ProducerFencedException("producer has been closed"));
cnx.channel().close();
future.complete(null);
return;
}
resetBackoff();
log.info("[{}] [{}] Created producer on cnx {}", topic, producerName, cnx.ctx().channel());
connectionId = cnx.ctx().channel().toString();
connectedSince = DateFormatter.now();
if (conf.getAccessMode() != ProducerAccessMode.Shared && !topicEpoch.isPresent()) {
log.info("[{}] [{}] Producer epoch is {}", topic, producerName, response.getTopicEpoch());
}
topicEpoch = response.getTopicEpoch();
if (this.producerName == null) {
this.producerName = producerName;
}
if (this.msgIdGenerator == 0 && conf.getInitialSequenceId() == null) {
// Only update sequence id generator if it wasn't already modified. That means we only want
// to update the id generator the first time the producer gets established, and ignore the
// sequence id sent by broker in subsequent producer reconnects
this.lastSequenceIdPublished = lastSequenceId;
this.msgIdGenerator = lastSequenceId + 1;
}
resendMessages(cnx, epoch);
}
future.complete(null);
}).exceptionally((e) -> {
Throwable cause = e.getCause();
cnx.removeProducer(producerId);
State state = getState();
if (state == State.Closing || state == State.Closed) {
// Producer was closed while reconnecting, close the connection to make sure the broker
// drops the producer on its side
cnx.channel().close();
future.complete(null);
return null;
}
if (cause instanceof TimeoutException) {
// Creating the producer has timed out. We need to ensure the broker closes the producer
// in case it was indeed created, otherwise it might prevent new create producer operation,
// since we are not necessarily closing the connection.
long closeRequestId = client.newRequestId();
ByteBuf cmd = Commands.newCloseProducer(producerId, closeRequestId);
cnx.sendRequestWithId(cmd, closeRequestId);
}
// Close the producer since topic does not exist.
if (cause instanceof PulsarClientException.TopicDoesNotExistException) {
closeAsync().whenComplete((v, ex) -> {
if (ex != null) {
log.error("Failed to close producer on TopicDoesNotExistException.", ex);
}
producerCreatedFuture.completeExceptionally(cause);
});
future.complete(null);
return null;
}
if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
synchronized (this) {
log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", topic,
producerName);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Pending messages: {}", topic, producerName,
pendingMessages.messagesCount());
}
PulsarClientException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException(
format("The backlog quota of the topic %s that the producer %s produces to is exceeded",
topic, producerName));
failPendingMessages(cnx(), bqe);
}
} else if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededError) {
log.warn("[{}] [{}] Producer is blocked on creation because backlog exceeded on topic.",
producerName, topic);
} else if (PulsarClientException.isRetriableError(cause)) {
log.info("[{}] [{}] Temporary error in creating producer: {}", topic, producerName, cause.getMessage());
} else {
log.error("[{}] [{}] Failed to create producer: {}", topic, producerName, cause.getMessage());
}
if (cause instanceof PulsarClientException.TopicTerminatedException) {
setState(State.Terminated);
synchronized (this) {
failPendingMessages(cnx(), (PulsarClientException) cause);
}
producerCreatedFuture.completeExceptionally(cause);
closeProducerTasks();
client.cleanupProducer(this);
} else if (cause instanceof PulsarClientException.ProducerFencedException) {
setState(State.ProducerFenced);
synchronized (this) {
failPendingMessages(cnx(), (PulsarClientException) cause);
}
producerCreatedFuture.completeExceptionally(cause);
closeProducerTasks();
client.cleanupProducer(this);
} else if (producerCreatedFuture.isDone() || (
cause instanceof PulsarClientException
&& PulsarClientException.isRetriableError(cause)
&& System.currentTimeMillis() < PRODUCER_DEADLINE_UPDATER.get(ProducerImpl.this)
)) {
// Either we had already created the producer once (producerCreatedFuture.isDone()) or we are
// still within the initial timeout budget and we are dealing with a retriable error
future.completeExceptionally(cause);
} else {
setState(State.Failed);
producerCreatedFuture.completeExceptionally(cause);
closeProducerTasks();
client.cleanupProducer(this);
Timeout timeout = sendTimeout;
if (timeout != null) {
timeout.cancel();
sendTimeout = null;
}
}
if (!future.isDone()) {
future.complete(null);
}
return null;
});
return future;
}