default AuthorizationResult authorizeByResourceType()

in clients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java [184:293]


    default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
        SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);

        // Check a hard-coded name to ensure that super users are granted
        // access regardless of DENY ACLs.
        if (authorize(requestContext, Collections.singletonList(new Action(
                op, new ResourcePattern(resourceType, "hardcode", PatternType.LITERAL),
                0, true, false)))
                .get(0) == AuthorizationResult.ALLOWED) {
            return AuthorizationResult.ALLOWED;
        }

        // Filter out all the resource pattern corresponding to the RequestContext,
        // AclOperation, and ResourceType
        ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
            resourceType, null, PatternType.ANY);
        AclBindingFilter aclFilter = new AclBindingFilter(
            resourceTypeFilter, AccessControlEntryFilter.ANY);

        EnumMap<PatternType, Set<String>> denyPatterns =
            new EnumMap<>(PatternType.class) {{
                    put(PatternType.LITERAL, new HashSet<>());
                    put(PatternType.PREFIXED, new HashSet<>());
                }};
        EnumMap<PatternType, Set<String>> allowPatterns =
            new EnumMap<>(PatternType.class) {{
                    put(PatternType.LITERAL, new HashSet<>());
                    put(PatternType.PREFIXED, new HashSet<>());
                }};

        boolean hasWildCardAllow = false;

        KafkaPrincipal principal = new KafkaPrincipal(
            requestContext.principal().getPrincipalType(),
            requestContext.principal().getName());
        String hostAddr = requestContext.clientAddress().getHostAddress();

        for (AclBinding binding : acls(aclFilter)) {
            if (!binding.entry().host().equals(hostAddr) && !binding.entry().host().equals("*"))
                continue;

            if (!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
                    && !binding.entry().principal().equals("User:*"))
                continue;

            if (binding.entry().operation() != op
                    && binding.entry().operation() != AclOperation.ALL)
                continue;

            if (binding.entry().permissionType() == AclPermissionType.DENY) {
                switch (binding.pattern().patternType()) {
                    case LITERAL:
                        // If wildcard deny exists, return deny directly
                        if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
                            return AuthorizationResult.DENIED;
                        denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
                        break;
                    case PREFIXED:
                        denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
                        break;
                    default:
                }
                continue;
            }

            if (binding.entry().permissionType() != AclPermissionType.ALLOW)
                continue;

            switch (binding.pattern().patternType()) {
                case LITERAL:
                    if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
                        hasWildCardAllow = true;
                        continue;
                    }
                    allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
                    break;
                case PREFIXED:
                    allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
                    break;
                default:
            }
        }

        if (hasWildCardAllow) {
            return AuthorizationResult.ALLOWED;
        }

        // For any literal allowed, if there's no dominant literal and prefix denied, return allow.
        // For any prefix allowed, if there's no dominant prefix denied, return allow.
        for (Map.Entry<PatternType, Set<String>> entry : allowPatterns.entrySet()) {
            for (String allowStr : entry.getValue()) {
                if (entry.getKey() == PatternType.LITERAL
                        && denyPatterns.get(PatternType.LITERAL).contains(allowStr))
                    continue;
                StringBuilder sb = new StringBuilder();
                boolean hasDominatedDeny = false;
                for (char ch : allowStr.toCharArray()) {
                    sb.append(ch);
                    if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) {
                        hasDominatedDeny = true;
                        break;
                    }
                }
                if (!hasDominatedDeny)
                    return AuthorizationResult.ALLOWED;
            }
        }

        return AuthorizationResult.DENIED;
    }