in pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java [1216:1486]
protected void handleSubscribe(final CommandSubscribe subscribe) {
checkArgument(state == State.Connected);
final long requestId = subscribe.getRequestId();
final long consumerId = subscribe.getConsumerId();
TopicName topicName = validateTopicName(subscribe.getTopic(), requestId, subscribe);
if (topicName == null) {
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}",
remoteAddress, authRole, originalPrincipal);
}
final String subscriptionName = subscribe.getSubscription();
final SubType subType = subscribe.getSubType();
final String consumerName = subscribe.hasConsumerName() ? subscribe.getConsumerName() : "";
final boolean isDurable = subscribe.isDurable();
final MessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(
subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
: null;
final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
final boolean readCompacted = subscribe.hasReadCompacted() && subscribe.isReadCompacted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
final InitialPosition initialPosition = subscribe.getInitialPosition();
final long startMessageRollbackDurationSec = subscribe.hasStartMessageRollbackDurationSec()
? subscribe.getStartMessageRollbackDurationSec()
: -1;
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final Boolean isReplicated =
subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null;
final boolean forceTopicCreation = subscribe.isForceTopicCreation();
final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta()
? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta())
: emptyKeySharedMeta;
final long consumerEpoch = subscribe.hasConsumerEpoch() ? subscribe.getConsumerEpoch() : DEFAULT_CONSUMER_EPOCH;
final Optional<Map<String, String>> subscriptionProperties = SubscriptionOption.getPropertiesMap(
subscribe.getSubscriptionPropertiesList());
if (log.isDebugEnabled()) {
log.debug("Topic name = {}, subscription name = {}, schema is {}", topicName, subscriptionName,
schema == null ? "absent" : "present");
}
CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName,
subscriptionName,
TopicOperation.CONSUME
);
// Make sure the consumer future is put into the consumers map first to avoid the same consumer
// epoch using different consumer futures, and only remove the consumer future from the map
// if subscribe failed .
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
CompletableFuture<Consumer> existingConsumerFuture =
consumers.putIfAbsent(consumerId, consumerFuture);
isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to subscribe with role {}",
remoteAddress, getPrincipal());
}
log.info("[{}] Subscribing on topic {} / {}. consumerId: {}, role: {}", this.toString(), topicName,
subscriptionName, consumerId, getPrincipal());
try {
Metadata.validateMetadata(metadata,
service.getPulsar().getConfiguration().getMaxConsumerMetadataSize());
} catch (IllegalArgumentException iae) {
final String msg = iae.getMessage();
consumers.remove(consumerId, consumerFuture);
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
return null;
}
if (existingConsumerFuture != null) {
if (!existingConsumerFuture.isDone()){
// There was an early request to create a consumer with same consumerId. This can happen
// when
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
"Consumer is already present on the connection");
} else if (existingConsumerFuture.isCompletedExceptionally()){
log.warn("[{}][{}][{}] A failed consumer with id is already present on the connection,"
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
String.format("A failed consumer with id is already present on the connection."
+ " consumerId: %s, remoteAddress: %s, subscription: %s",
consumerId, remoteAddress, subscriptionName));
/**
* This future may was failed due to the client closed a in-progress subscribing.
* See {@link #handleCloseConsumer(CommandCloseConsumer)}
* Do not remove the failed future at current line, it will be removed after the progress of
* the previous subscribing is done.
* Before the previous subscribing is done, the new subscribe request will always fail.
* This mechanism is in order to prevent more complex logic to handle the race conditions.
*/
commandSender.sendErrorResponse(requestId, error,
"Consumer that failed is already present on the connection");
} else {
Consumer consumer = existingConsumerFuture.getNow(null);
log.warn("[{}] Consumer with the same id is already created:"
+ " consumerId={}, consumer={}",
remoteAddress, consumerId, consumer);
commandSender.sendSuccessResponse(requestId);
}
return null;
}
service.isAllowAutoTopicCreationAsync(topicName.toString())
.thenApply(isAllowed -> forceTopicCreation && isAllowed)
.thenCompose(createTopicIfDoesNotExist ->
service.getTopic(topicName.toString(), createTopicIfDoesNotExist))
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
return FutureUtil
.failedFuture(new TopicNotFoundException(
"Topic " + topicName + " does not exist"));
}
final Topic topic = optTopic.get();
// Check max consumer limitation to avoid unnecessary ops wasting resources. For example:
// the new consumer reached max producer limitation, but pulsar did schema check first,
// it would waste CPU.
if (((AbstractTopic) topic).isConsumersExceededOnTopic()) {
log.warn("[{}] Attempting to add consumer to topic which reached max"
+ " consumers limit", topic);
Throwable t =
new ConsumerBusyException("Topic reached max consumers limit");
return FutureUtil.failedFuture(t);
}
return service.isAllowAutoSubscriptionCreationAsync(topicName)
.thenCompose(isAllowedAutoSubscriptionCreation -> {
boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !isAllowedAutoSubscriptionCreation
&& !topic.getSubscriptions().containsKey(subscriptionName)
&& topic.isPersistent();
if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(
new SubscriptionNotFoundException(
"Subscription does not exist"));
}
SubscriptionOption option = SubscriptionOption.builder().cnx(ServerCnx.this)
.subscriptionName(subscriptionName)
.consumerId(consumerId).subType(subType)
.priorityLevel(priorityLevel)
.consumerName(consumerName).isDurable(isDurable)
.startMessageId(startMessageId).metadata(metadata)
.readCompacted(readCompacted)
.initialPosition(initialPosition)
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
.replicatedSubscriptionStateArg(isReplicated)
.keySharedMeta(keySharedMeta)
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.schemaType(schema == null ? null : schema.getType())
.build();
if (schema != null && schema.getType() != SchemaType.AUTO_CONSUME) {
return ignoreUnrecoverableBKException
(topic.addSchemaIfIdleOrCheckCompatible(schema))
.thenCompose(v -> topic.subscribe(option));
} else {
return topic.subscribe(option);
}
});
})
.thenAccept(consumer -> {
if (consumer.checkAndApplyTopicMigration()) {
log.info("[{}] Disconnecting consumer {} on migrated subscription on topic {} / {}",
remoteAddress, consumerId, subscriptionName, topicName);
consumers.remove(consumerId, consumerFuture);
return;
}
if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}",
remoteAddress, topicName, subscriptionName);
commandSender.sendSuccessResponse(requestId);
if (brokerInterceptor != null) {
try {
brokerInterceptor.consumerCreated(this, consumer, metadata);
} catch (Throwable t) {
log.error("Exception occur when intercept consumer created.", t);
}
}
} else {
// The consumer future was completed before by a close command
try {
consumer.close();
log.info("[{}] Cleared consumer created after timeout on client side {}",
remoteAddress, consumer);
} catch (BrokerServiceException e) {
log.warn(
"[{}] Error closing consumer created"
+ " after timeout on client side {}: {}",
remoteAddress, consumer, e.getMessage());
}
consumers.remove(consumerId, consumerFuture);
}
})
.exceptionally(exception -> {
if (exception.getCause() instanceof ConsumerBusyException) {
if (log.isDebugEnabled()) {
log.debug(
"[{}][{}][{}] Failed to create consumer because exclusive consumer"
+ " is already connected: {}",
remoteAddress, topicName, subscriptionName,
exception.getCause().getMessage());
}
} else if (exception.getCause() instanceof BrokerServiceException.TopicMigratedException) {
Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar(),
topicName.toString());
if (clusterURL.isPresent()) {
log.info("[{}] redirect migrated consumer to topic {}: "
+ "consumerId={}, subName={}, {}", remoteAddress,
topicName, consumerId, subscriptionName, exception.getCause().getMessage());
boolean msgSent = commandSender.sendTopicMigrated(ResourceType.Consumer, consumerId,
clusterURL.get().getBrokerServiceUrl(),
clusterURL.get().getBrokerServiceUrlTls());
if (!msgSent) {
log.info("consumer client doesn't support topic migration handling {}-{}-{}",
topicName, remoteAddress, consumerId);
}
consumers.remove(consumerId, consumerFuture);
closeConsumer(consumerId, Optional.empty());
return null;
}
} else if (exception.getCause() instanceof BrokerServiceException) {
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage());
} else {
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage(), exception);
}
// If client timed out, the future would have been completed by subsequent close.
// Send error back to client, only if not completed already.
if (consumerFuture.completeExceptionally(exception)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(exception.getCause()),
exception.getCause().getMessage());
}
consumers.remove(consumerId, consumerFuture);
return null;
});
} else {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
consumers.remove(consumerId, consumerFuture);
writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex);
consumers.remove(consumerId, consumerFuture);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
return null;
});
}