in clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java [184:293]
default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);
// Check a hard-coded name to ensure that super users are granted
// access regardless of DENY ACLs.
if (authorize(requestContext, Collections.singletonList(new Action(
op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL),
0, true, false)))
.get(0) == AuthorizationResult.ALLOWED) {
return AuthorizationResult.ALLOWED;
}
// Filter out all the resource pattern corresponding to the RequestContext,
// AclOperation, and ResourceType
ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
resourceType, null, PatternType.ANY);
AclBindingFilter aclFilter = new AclBindingFilter(
resourceTypeFilter, AccessControlEntryFilter.ANY);
EnumMap<PatternType, Set<String>> denyPatterns =
new EnumMap<>(PatternType.class) {{
put(PatternType.LITERAL, new HashSet<>());
put(PatternType.PREFIXED, new HashSet<>());
}};
EnumMap<PatternType, Set<String>> allowPatterns =
new EnumMap<>(PatternType.class) {{
put(PatternType.LITERAL, new HashSet<>());
put(PatternType.PREFIXED, new HashSet<>());
}};
boolean hasWildCardAllow = false;
KafkaPrincipal principal = new KafkaPrincipal(
requestContext.principal().getPrincipalType(),
requestContext.principal().getName());
String hostAddr = requestContext.clientAddress().getHostAddress();
for (AclBinding binding : acls(aclFilter)) {
if (!binding.entry().host().equals(hostAddr) && !binding.entry().host().equals("*"))
continue;
if (!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
&& !binding.entry().principal().equals("User:*"))
continue;
if (binding.entry().operation() != op
&& binding.entry().operation() != AclOperation.ALL)
continue;
if (binding.entry().permissionType() == AclPermissionType.DENY) {
switch (binding.pattern().patternType()) {
case LITERAL:
// If wildcard deny exists, return deny directly
if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
return AuthorizationResult.DENIED;
denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
break;
case PREFIXED:
denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
break;
default:
}
continue;
}
if (binding.entry().permissionType() != AclPermissionType.ALLOW)
continue;
switch (binding.pattern().patternType()) {
case LITERAL:
if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
hasWildCardAllow = true;
continue;
}
allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
break;
case PREFIXED:
allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
break;
default:
}
}
if (hasWildCardAllow) {
return AuthorizationResult.ALLOWED;
}
// For any literal allowed, if there's no dominant literal and prefix denied, return allow.
// For any prefix allowed, if there's no dominant prefix denied, return allow.
for (Map.Entry<PatternType, Set<String>> entry : allowPatterns.entrySet()) {
for (String allowStr : entry.getValue()) {
if (entry.getKey() == PatternType.LITERAL
&& denyPatterns.get(PatternType.LITERAL).contains(allowStr))
continue;
StringBuilder sb = new StringBuilder();
boolean hasDominatedDeny = false;
for (char ch : allowStr.toCharArray()) {
sb.append(ch);
if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) {
hasDominatedDeny = true;
break;
}
}
if (!hasDominatedDeny)
return AuthorizationResult.ALLOWED;
}
}
return AuthorizationResult.DENIED;
}