in fluss-server/src/main/java/com/alibaba/fluss/server/authorizer/DefaultAuthorizer.java [228:306]
public List<AclDeleteResult> dropAcls(
Session session, List<AclBindingFilter> aclBindingFilters) {
Map<AclBinding, Integer> deletedBindings = new HashMap<>();
Map<AclBinding, ApiError> deleteExceptions = new HashMap<>();
List<Tuple2<AclBindingFilter, Integer>> filters =
IntStream.range(0, aclBindingFilters.size())
.mapToObj(i -> Tuple2.of(aclBindingFilters.get(i), i))
.collect(Collectors.toList());
synchronized (lock) {
Set<Resource> resources = new HashSet<>(aclCache.keySet());
Map<Resource, List<Tuple2<AclBindingFilter, Integer>>> resourcesToUpdate =
new HashMap<>();
for (Resource resource : resources) {
List<Tuple2<AclBindingFilter, Integer>> matchingFilters = new ArrayList<>();
for (Tuple2<AclBindingFilter, Integer> filter : filters) {
if (filter.f0.getResourceFilter().matches(resource)) {
matchingFilters.add(filter);
}
}
if (!matchingFilters.isEmpty()) {
resourcesToUpdate.put(resource, matchingFilters);
}
}
authorizeAclOperation(session, resourcesToUpdate.keySet());
for (Map.Entry<Resource, List<Tuple2<AclBindingFilter, Integer>>> entry :
resourcesToUpdate.entrySet()) {
Resource resource = entry.getKey();
List<Tuple2<AclBindingFilter, Integer>> matchingFilters = entry.getValue();
Map<AclBinding, Integer> resourceBindingsBeingDeleted = new HashMap<>();
try {
updateResourceAcl(
resource,
currentAcls -> {
Set<AccessControlEntry> aclsToRemove = new HashSet<>();
for (AccessControlEntry acl : currentAcls) {
for (Tuple2<AclBindingFilter, Integer> filter :
matchingFilters) {
if (filter.f0.getEntryFilter().matches(acl)) {
AclBinding binding = new AclBinding(resource, acl);
deletedBindings.putIfAbsent(binding, filter.f1);
resourceBindingsBeingDeleted.putIfAbsent(
binding, filter.f1);
aclsToRemove.add(acl);
}
}
}
return Sets.difference(currentAcls, aclsToRemove);
});
} catch (Exception e) {
for (AclBinding binding : resourceBindingsBeingDeleted.keySet()) {
ApiError apiError = ApiError.fromThrowable(e);
deleteExceptions.putIfAbsent(binding, apiError);
}
}
}
}
Map<Integer, Set<AclDeleteResult.AclBindingDeleteResult>> deletedResult = new HashMap<>();
for (Map.Entry<AclBinding, Integer> entry : deletedBindings.entrySet()) {
deletedResult
.computeIfAbsent(entry.getValue(), k -> new HashSet<>())
.add(
new AclDeleteResult.AclBindingDeleteResult(
entry.getKey(),
deleteExceptions.getOrDefault(entry.getKey(), null)));
}
List<AclDeleteResult> results = new ArrayList<>();
for (int i = 0; i < aclBindingFilters.size(); i++) {
Set<AclDeleteResult.AclBindingDeleteResult> bindings =
deletedResult.getOrDefault(i, Collections.emptySet());
results.add(new AclDeleteResult(bindings));
}
return results;
}