in mrs_plugin/auth_apps.py [0:0]
def add_auth_app(app_name=None, service_id=None, **kwargs):
"""Adds an auth_app to the given MRS service
Args:
app_name (str): The app_name
service_id (str): The id of the service the schema should be added to
**kwargs: Additional options
Keyword Args:
auth_vendor_id (str): The auth_vendor_id
description (str): A description of the app
url (str): url of the app
url_direct_auth (str): url direct auth of the app
access_token (str): access_token of the app
app_id (str): app_id of the app
limit_to_registered_users (bool): Limit access to registered users
registered_users (list): List of registered users
default_role_id (str): The default role to be assigned to new users
enabled (int): Whether the Auth App is enabled
options (dict): Additional options
session (object): The database session to use
Returns:
A dict with content_set_id and number_of_files_uploaded
"""
if service_id:
service_id = lib.core.id_to_binary(service_id, "service_id")
lib.core.convert_ids_to_binary(
["auth_vendor_id", "default_role_id"], kwargs)
auth_vendor_id = kwargs.get("auth_vendor_id")
default_role_id = kwargs.get("default_role_id")
description = kwargs.get("description")
url = kwargs.get("url")
url_direct_auth = kwargs.get("url_direct_auth")
access_token = kwargs.get("access_token")
app_id = kwargs.get("app_id")
limit_to_reg_users = kwargs.get("limit_to_registered_users")
registered_users = kwargs.get("registered_users")
enabled = kwargs.get("enabled", 1)
options = kwargs.get("options")
interactive = lib.core.get_interactive_default()
with lib.core.MrsDbSession(exception_handler=lib.core.print_exception, **kwargs) as session:
if service_id is not None:
service = resolve_service(session, service_id)
if service is not None:
service_id = service["id"]
# Get auth_vendor_id
if not auth_vendor_id and interactive:
app_vendors = lib.core.select(table="auth_vendor",
cols=["id", "name"],
where="enabled=1"
).exec(session).items
if len(app_vendors) == 0:
raise ValueError("No authentication vendors enabled.")
app_vendor = lib.core.prompt_for_list_item(
item_list=app_vendors, prompt_caption=(
"Please select an authentication vendor: "),
item_name_property="name",
print_list=True)
if not app_vendor:
raise ValueError("Operation cancelled.")
auth_vendor_id = app_vendor['id']
if not auth_vendor_id:
raise ValueError("No authentication vendor specified.")
# Get app_name
if not app_name and interactive:
if auth_vendor_id == lib.auth_apps.MYSQL_AUTHENTICATION:
app_name = lib.core.prompt(
"Please enter the name of the authentication app "
"[MySQL Account Access]: ",
{'defaultValue': 'MySQL Account Access'})
else:
app_name = lib.core.prompt(
"Please enter the name of the authentication app: ")
if not app_name:
raise ValueError("Operation cancelled.")
if not app_name:
raise ValueError("No app name specified.")
# Get description
if not description and interactive:
if auth_vendor_id == lib.auth_apps.MYSQL_AUTHENTICATION:
description = lib.core.prompt(
"Please enter a description for the authentication app "
"[Authentication via MySQL accounts]",
{'defaultValue':
'Authentication via MySQL accounts'})
else:
description = lib.core.prompt(
"Please enter a description for the authentication app: ")
# Get limit_to_registered_users
if not limit_to_reg_users and interactive:
limit_to_reg_users = lib.core.prompt(
"Limit authentication to registered users? [y/N]: ",
{'defaultValue': 'n'}).strip().lower() == 'y'
# Get registered_users, convert to list
if limit_to_reg_users and not registered_users and interactive:
registered_users = lib.core.prompt(
"Please enter a list of registered user names, separated "
"by comma (,): ")
registered_users = registered_users.split(',')
registered_users = [reg_user.strip()
for reg_user in registered_users]
default_role_id = default_role_id or lib.auth_apps.DEFAULT_ROLE_ID
with lib.core.MrsDbTransaction(session):
# Create the auth_app
auth_app_id = lib.auth_apps.add_auth_app(session, service_id, auth_vendor_id,
app_name, description, url, url_direct_auth, access_token, app_id,
limit_to_reg_users, default_role_id, enabled, options)
# Create the registered_users if specified
if registered_users and len(registered_users) > 0:
role_comments = "Default role." if default_role_id == lib.auth_apps.DEFAULT_ROLE_ID else ""
for reg_user in registered_users:
user_id = lib.users.add_user(session, auth_app_id, reg_user, None, None, None,
None, None, None)
if default_role_id:
lib.users.add_user_role(
session, user_id, default_role_id, role_comments)
if lib.core.get_interactive_result():
return f"\nAuthentication app with the id {auth_app_id} was added successfully."
else:
return {
"auth_app_id": auth_app_id
}