in mrs_plugin/lib/MrsDdlExecutor.py [0:0]
def showRestRoles(self, mrs_object: dict):
timer = Timer()
self.current_operation = mrs_object.pop("current_operation")
user_name = mrs_object.get("user_name")
auth_app_name = mrs_object.get("auth_app_name")
any_service = mrs_object.get("any_service", False)
show_users = False
show_services = False
try:
service_id = self.get_given_or_current_service_id(
mrs_object, allow_not_set=True)
if not service_id:
any_service = True
if user_name is not None and auth_app_name is not None and not any_service:
user = lib.users.get_user(
self.session,
user_name=user_name,
auth_app_name=auth_app_name,
service_id=service_id,
)
if not user:
raise Exception(f"User {lib.core.quote_user(user_name)}@{lib.core.quote_auth_app(auth_app_name)} not found")
roles = lib.users.get_user_roles(
session=self.session, user_id=user.get("id")
)
elif user_name or auth_app_name:
show_users = True
show_services = True
roles = lib.roles.get_granted_roles(
session=self.session,
specific_to_service_id=service_id if not any_service else None,
user_name=user_name,
auth_app_name=auth_app_name,
include_users=show_users,
)
else:
show_services = True
roles = lib.roles.get_roles(
session=self.session,
specific_to_service_id=service_id if not any_service else None,
include_global=True
)
result = []
if user_name is not None and auth_app_name is not None:
target = lib.core.quote_user(user_name) + "@" + lib.core.quote_auth_app(auth_app_name)
column_names = [f"REST roles for {target}"]
for role in roles:
result.append(
{
column_names[0]: role.get("caption"),
"comments": role.get("comments") or "",
"derived_from_role": role.get("derived_from_role_caption") or "",
"description": role.get("description") or "",
"options": role.get("options"),
}
)
if show_services:
result[-1]["specific_to_service"] = (
role.get("specific_to_service_request_path") or ""
)
else:
target = None
if user_name:
column_names = [f"REST roles for {user_name}"]
elif auth_app_name:
column_names = [f"REST roles for @{auth_app_name}"]
else:
column_names = [f"REST role"]
for role in roles:
result.append(
{
column_names[0]: role.get("caption"),
"derived_from_role": role.get("derived_from_role_caption")
or "",
"description": role.get("description") or "",
"options": role.get("options"),
}
)
if show_services:
result[-1]["specific_to_service"] = (
role.get("specific_to_service_request_path") or ""
)
if show_users:
result[-1]["users"] = role.get("users") or ""
column_names += [
"derived_from_role",
"description",
"options",
]
if show_services:
column_names.append("specific_to_service")
if target:
column_names.append("comments")
if show_users:
column_names.append("users")
self.results.append(
{
"statementIndex": len(self.results) + 1,
"line": mrs_object.get("line"),
"type": "success",
"message": f'OK, {len(roles)} record{"s" if len(roles) > 1 else ""} received.',
"operation": self.current_operation,
"result": result,
"columns": column_names,
"executionTime": timer.elapsed(),
}
)
except Exception as e:
self.results.append(
{
"statementIndex": len(self.results) + 1,
"line": mrs_object.get("line"),
"type": "error",
"message": f"Cannot SHOW REST ROLES. {e}",
"operation": self.current_operation,
}
)
raise