in source/idea/idea-cluster-manager/src/ideaclustermanager/app/api/authz_api.py [0:0]
def invoke(self, context: ApiInvocationContext):
namespace = context.namespace
acl_entry = Utils.get_value_as_dict(namespace, self.acl)
if acl_entry is None:
raise exceptions.unauthorized_access()
acl_entry_scope = Utils.get_value_as_string('scope', acl_entry)
is_authorized = context.is_authorized(elevated_access=True, scopes=[acl_entry_scope])
if is_authorized:
acl_entry['method'](context)
return
username = context.get_username()
# Conditional permissions for non-admins
if namespace == 'Authz.BatchPutRoleAssignment':
request = context.get_request_payload_as(BatchPutRoleAssignmentRequest)
project_ids = list(set([assignment.resource_id for assignment in request.items]))
# Current user must have "update_personnel" permission for all projects in the Put request
if all([context.is_authorized(elevated_access=False, scopes=[acl_entry_scope], role_assignment_resource_key=f"{project_id}:project") for project_id in project_ids]):
acl_entry['method'](context)
return
elif namespace == 'Authz.BatchDeleteRoleAssignment':
request = context.get_request_payload_as(BatchDeleteRoleAssignmentRequest)
project_ids = list(set([assignment.resource_id for assignment in request.items]))
# Current user must have "update_personnel" permission for all projects in the Delete request
if all([context.is_authorized(elevated_access=False, scopes=[acl_entry_scope], role_assignment_resource_key=f"{project_id}:project") for project_id in project_ids]):
acl_entry['method'](context)
return
elif namespace == 'Authz.ListRoleAssignments':
# Current user must be assigned to a project in the application
projects_assigned = self.context.projects.get_user_projects(GetUserProjectsRequest(username=username, exclude_disabled=False))
if not Utils.is_empty(projects_assigned.projects):
acl_entry['method'](context)
return
elif namespace == 'Authz.ListRoles':
# Current user must be assigned to a project in the application
projects_assigned = self.context.projects.get_user_projects(GetUserProjectsRequest(username=username, exclude_disabled=False))
if not Utils.is_empty(projects_assigned.projects):
acl_entry['method'](context)
return
elif namespace == 'Authz.GetRole':
# Current user must be assigned to a project in the application
projects_assigned = self.context.projects.get_user_projects(GetUserProjectsRequest(username=username, exclude_disabled=False))
if not Utils.is_empty(projects_assigned.projects):
acl_entry['method'](context)
return
raise exceptions.unauthorized_access()