in pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java [640:693]
public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
String role,
TopicOperation operation,
AuthenticationDataSource authData) {
if (log.isDebugEnabled()) {
log.debug("Check allowTopicOperationAsync [{}] on [{}].", operation.name(), topicName);
}
return validateTenantAdminAccess(topicName.getTenant(), role, authData)
.thenCompose(isSuperUserOrAdmin -> {
if (log.isDebugEnabled()) {
log.debug("Verify if role {} is allowed to {} to topic {}: isSuperUserOrAdmin={}",
role, operation, topicName, isSuperUserOrAdmin);
}
if (isSuperUserOrAdmin) {
return CompletableFuture.completedFuture(true);
} else {
switch (operation) {
case LOOKUP:
case GET_STATS:
case GET_METADATA:
return canLookupAsync(topicName, role, authData);
case PRODUCE:
return canProduceAsync(topicName, role, authData);
case GET_SUBSCRIPTIONS:
case CONSUME:
case SUBSCRIBE:
case UNSUBSCRIBE:
case SKIP:
case EXPIRE_MESSAGES:
case PEEK_MESSAGES:
case RESET_CURSOR:
case GET_BACKLOG_SIZE:
case SET_REPLICATED_SUBSCRIPTION_STATUS:
case GET_REPLICATED_SUBSCRIPTION_STATUS:
return canConsumeAsync(topicName, role, authData, authData.getSubscription());
case TERMINATE:
case COMPACT:
case OFFLOAD:
case UNLOAD:
case TRIM_TOPIC:
case DELETE_METADATA:
case UPDATE_METADATA:
case ADD_BUNDLE_RANGE:
case GET_BUNDLE_RANGE:
case DELETE_BUNDLE_RANGE:
return CompletableFuture.completedFuture(false);
default:
return FutureUtil.failedFuture(new IllegalStateException(
"TopicOperation [" + operation.name() + "] is not supported."));
}
}
});
}