in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java [1504:1760]
protected void handleProducer(final CommandProducer cmdProducer) {
checkArgument(state == State.Connected);
final long producerId = cmdProducer.getProducerId();
final long requestId = cmdProducer.getRequestId();
// Use producer name provided by client if present
final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
: service.generateUniqueProducerName();
final long epoch = cmdProducer.getEpoch();
final boolean userProvidedProducerName = cmdProducer.isUserProvidedProducerName();
final boolean isEncrypted = cmdProducer.isEncrypted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
final SchemaData schema = cmdProducer.hasSchema() ? getSchema(cmdProducer.getSchema()) : null;
final ProducerAccessMode producerAccessMode = cmdProducer.getProducerAccessMode();
final Optional<Long> topicEpoch = cmdProducer.hasTopicEpoch()
? Optional.of(cmdProducer.getTopicEpoch()) : Optional.empty();
final boolean isTxnEnabled = cmdProducer.isTxnEnabled();
final String initialSubscriptionName =
cmdProducer.hasInitialSubscriptionName() ? cmdProducer.getInitialSubscriptionName() : null;
final boolean supportsPartialProducer = supportsPartialProducer();
final TopicName topicName = validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
if (topicName == null) {
return;
}
CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName, TopicOperation.PRODUCE, authenticationData, originalAuthData
);
if (!Strings.isNullOrEmpty(initialSubscriptionName)) {
isAuthorizedFuture =
isAuthorizedFuture.thenCombine(
isTopicOperationAllowed(topicName, initialSubscriptionName, TopicOperation.SUBSCRIBE),
(canProduce, canSubscribe) -> canProduce && canSubscribe);
}
isAuthorizedFuture.thenApply(isAuthorized -> {
if (!isAuthorized) {
String msg = "Client is not authorized to Produce";
log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
return null;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to Produce with role {}", remoteAddress, getPrincipal());
}
CompletableFuture<Producer> producerFuture = new CompletableFuture<>();
CompletableFuture<Producer> existingProducerFuture = producers.putIfAbsent(producerId, producerFuture);
if (existingProducerFuture != null) {
if (!existingProducerFuture.isDone()) {
// There was an early request to create a producer with same producerId.
// This can happen when client timeout is lower than the broker timeouts.
// We need to wait until the previous producer creation request
// either complete or fails.
log.warn("[{}][{}] Producer with id is already present on the connection, producerId={}",
remoteAddress, topicName, producerId);
commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
"Producer is already present on the connection");
} else if (existingProducerFuture.isCompletedExceptionally()) {
// remove producer with producerId as it's already completed with exception
log.warn("[{}][{}] Producer with id is failed to register present on the connection, producerId={}",
remoteAddress, topicName, producerId);
ServerError error = getErrorCode(existingProducerFuture);
producers.remove(producerId, existingProducerFuture);
commandSender.sendErrorResponse(requestId, error,
"Producer is already failed to register present on the connection");
} else {
Producer producer = existingProducerFuture.getNow(null);
log.info("[{}] [{}] Producer with the same id is already created:"
+ " producerId={}, producer={}", remoteAddress, topicName, producerId, producer);
commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(),
producer.getSchemaVersion());
}
return null;
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Creating producer. producerId={}, producerName={}, schema is {}", remoteAddress,
topicName, producerId, producerName, schema == null ? "absent" : "present");
}
service.getOrCreateTopic(topicName.toString()).thenCompose((Topic topic) -> {
// Check max producer limitation to avoid unnecessary ops wasting resources. For example: the new
// producer reached max producer limitation, but pulsar did schema check first, it would waste CPU
if (((AbstractTopic) topic).isProducersExceeded(producerName)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
String errorMsg = "Topic '" + topicName.toString() + "' reached max producers limit";
Throwable t = new BrokerServiceException.ProducerBusyException(errorMsg);
return CompletableFuture.failedFuture(t);
}
// Before creating producer, check if backlog quota exceeded
// on topic for size based limit and time based limit
CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.destination_storage),
topic.checkBacklogQuotaExceeded(producerName, BacklogQuotaType.message_age));
backlogQuotaCheckFuture.thenRun(() -> {
// Check whether the producer will publish encrypted messages or not
if ((topic.isEncryptionRequired() || encryptionRequireOnProducer)
&& !isEncrypted
&& !SystemTopicNames.isSystemTopic(topicName)) {
String msg = String.format("Encryption is required in %s", topicName);
log.warn("[{}] {}", remoteAddress, msg);
if (producerFuture.completeExceptionally(new ServerMetadataException(msg))) {
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
}
producers.remove(producerId, producerFuture);
return;
}
disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);
schemaVersionFuture.exceptionally(exception -> {
if (producerFuture.completeExceptionally(exception)) {
String message = exception.getMessage();
if (exception.getCause() != null) {
message += (" caused by " + exception.getCause());
}
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(exception),
message);
}
var cause = FutureUtil.unwrapCompletionException(exception);
if (!(cause instanceof IncompatibleSchemaException)) {
log.error("Try add schema failed, remote address {}, topic {}, producerId {}",
remoteAddress,
topicName, producerId, exception);
}
producers.remove(producerId, producerFuture);
return null;
});
schemaVersionFuture.thenAccept(schemaVersion -> {
CompletionStage<Subscription> createInitSubFuture;
if (!Strings.isNullOrEmpty(initialSubscriptionName)
&& topic.isPersistent()
&& !topic.getSubscriptions().containsKey(initialSubscriptionName)) {
createInitSubFuture = service.isAllowAutoSubscriptionCreationAsync(topicName)
.thenCompose(isAllowAutoSubscriptionCreation -> {
if (!isAllowAutoSubscriptionCreation) {
return CompletableFuture.failedFuture(
new BrokerServiceException.NotAllowedException(
"Could not create the initial subscription due to the "
+ "auto subscription creation is not allowed."));
}
return topic.createSubscription(initialSubscriptionName,
InitialPosition.Earliest, false, null);
});
} else {
createInitSubFuture = CompletableFuture.completedFuture(null);
}
createInitSubFuture.whenComplete((sub, ex) -> {
if (ex != null) {
final Throwable rc = FutureUtil.unwrapCompletionException(ex);
if (rc instanceof BrokerServiceException.NotAllowedException) {
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
remoteAddress, rc.getMessage(), initialSubscriptionName, topicName);
if (producerFuture.completeExceptionally(rc)) {
commandSender.sendErrorResponse(requestId,
ServerError.NotAllowedError, rc.getMessage());
}
producers.remove(producerId, producerFuture);
return;
}
String msg =
"Failed to create the initial subscription: " + ex.getCause().getMessage();
log.warn("[{}] {} initialSubscriptionName: {}, topic: {}",
remoteAddress, msg, initialSubscriptionName, topicName);
if (producerFuture.completeExceptionally(ex)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(ex), msg);
}
producers.remove(producerId, producerFuture);
return;
}
buildProducerAndAddTopic(topic, producerId, producerName, requestId, isEncrypted,
metadata, schemaVersion, epoch, userProvidedProducerName, topicName,
producerAccessMode, topicEpoch, supportsPartialProducer, producerFuture);
});
});
});
return backlogQuotaCheckFuture;
}).exceptionally(exception -> {
Throwable cause = exception.getCause();
if (cause instanceof BrokerServiceException.TopicBacklogQuotaExceededException) {
BrokerServiceException.TopicBacklogQuotaExceededException tbqe =
(BrokerServiceException.TopicBacklogQuotaExceededException) cause;
IllegalStateException illegalStateException = new IllegalStateException(tbqe);
BacklogQuota.RetentionPolicy retentionPolicy = tbqe.getRetentionPolicy();
if (producerFuture.completeExceptionally(illegalStateException)) {
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededError,
illegalStateException.getMessage());
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededException,
illegalStateException.getMessage());
}
}
producers.remove(producerId, producerFuture);
return null;
} else if (cause instanceof BrokerServiceException.TopicMigratedException) {
Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar(), topicName.toString());
if (clusterURL.isPresent()) {
log.info("[{}] redirect migrated producer to topic {}: "
+ "producerId={}, producerName = {}, {}", remoteAddress,
topicName, producerId, producerName, cause.getMessage());
boolean msgSent = commandSender.sendTopicMigrated(ResourceType.Producer, producerId,
clusterURL.get().getBrokerServiceUrl(), clusterURL.get().getBrokerServiceUrlTls());
if (!msgSent) {
log.info("client doesn't support topic migration handling {}-{}-{}", topicName,
remoteAddress, producerId);
}
producers.remove(producerId, producerFuture);
closeProducer(producerId, -1L, Optional.empty());
return null;
}
}
// Do not print stack traces for expected exceptions
if (cause instanceof NoSuchElementException) {
cause = new TopicNotFoundException(String.format("Topic not found %s", topicName.toString()));
log.warn("[{}] Failed to load topic {}, producerId={}: Topic not found", remoteAddress, topicName,
producerId);
} else if (!Exceptions.areExceptionsPresentInChain(cause,
ServiceUnitNotReadyException.class, ManagedLedgerException.class,
BrokerServiceException.ProducerBusyException.class)) {
log.error("[{}] Failed to create topic {}, producerId={}",
remoteAddress, topicName, producerId, exception);
}
// If client timed out, the future would have been completed
// by subsequent close. Send error back to
// client, only if not completed already.
if (producerFuture.completeExceptionally(exception)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
}
producers.remove(producerId, producerFuture);
return null;
});
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "producer", getPrincipal(), Optional.of(topicName), ex);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
return null;
});
}