azext_iot/central/services/user.py (201 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import requests
from typing import List
from knack.log import get_logger
from azure.cli.core.azclierror import AzureResponseError
from azure.cli.core.util import should_disable_connection_verify
from azext_iot.constants import CENTRAL_ENDPOINT
from azext_iot.central.common import API_VERSION
from azext_iot.central.services import _utility
from azext_iot.central.models.enum import (
Role,
UserType,
get_enum_keys,
)
from azext_iot.central.models.ga_2022_07_31 import UserGa
from re import search
logger = get_logger(__name__)
BASE_PATH = "api/users"
MODEL = "User"
ROLE_PATTERN = r"([\S]+)\\\\([\S]+)"
def _make_call(
cmd,
app_id: str,
method: str,
path: str,
payload: str,
token: str,
central_dns_suffix: str,
api_version=API_VERSION,
url=None,
) -> dict:
if url is None:
url = "https://{}.{}/{}".format(app_id, central_dns_suffix, BASE_PATH)
if path is not None:
url = "{}/{}".format(url, path)
headers = _utility.get_headers(
token, cmd, has_json_payload=True if payload is not None else False
)
# Construct parameters
query_parameters = {}
query_parameters["api-version"] = api_version
response = requests.request(
url=url,
method=method.upper(),
headers=headers,
params=query_parameters,
json=payload,
verify=not should_disable_connection_verify(),
)
return _utility.try_extract_result(response)
def _create_roles(roles: str):
result_roles = []
parsed_roles = roles.split(",")
for role in parsed_roles:
org_id = None
match = search(ROLE_PATTERN, role)
if match and len(match.groups()) == 2:
# role is an org role
org_id = match[1]
role_id = (
Role[match[2]].value if match[2] in get_enum_keys(Role) else match[2]
)
else:
role_id = Role[role].value if role in get_enum_keys(Role) else role
if org_id:
result_roles.append({"role": role_id, "organization": org_id})
else:
result_roles.append({"role": role_id})
return result_roles
def add_or_update_service_principal_user(
cmd,
app_id: str,
assignee: str,
tenant_id: str,
object_id: str,
roles: str,
token: str,
api_version=API_VERSION,
update=False,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> UserGa:
"""
Add or update a user to a Central app
Args:
cmd: command passed into az
tenant_id: tenant id of service principal to be added
object_id: object id of service principal to be added
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
user: dict
"""
api_version = API_VERSION
user_type = UserType.service_principal.value
payload = {
"type": user_type,
}
if roles:
payload["roles"] = _create_roles(roles)
if tenant_id:
payload["tenantId"] = tenant_id
if object_id:
payload["objectId"] = object_id
result = _make_call(
cmd,
app_id=app_id,
method="patch" if update else "put",
path=assignee,
payload=payload,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
return _utility.get_object(result, MODEL, api_version)
def add_or_update_email_user(
cmd,
app_id: str,
assignee: str,
email: str,
roles: str,
token: str,
api_version=API_VERSION,
update=False,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> UserGa:
"""
Add or update a user to a Central app
Args:
cmd: command passed into az
email: email of user to be added
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
user: dict
"""
api_version = API_VERSION
user_type = UserType.email.value
payload = {"type": user_type, "roles": []}
if roles:
payload["roles"] = _create_roles(roles)
if email and not update:
payload["email"] = email
result = _make_call(
cmd,
app_id=app_id,
method="patch" if update else "put",
path=assignee,
payload=payload,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
return _utility.get_object(result, MODEL, api_version)
def get_user_list(
cmd,
app_id: str,
token: str,
api_version=API_VERSION,
max_pages=0,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> List[UserGa]:
"""
Get the list of users for central app.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
users: dict
"""
api_version = API_VERSION
users = []
url = "https://{}.{}/{}".format(app_id, central_dns_suffix, BASE_PATH)
pages_processed = 0
while (max_pages == 0 or pages_processed < max_pages) and url:
result = _make_call(
cmd,
app_id=app_id,
url=url,
method="get",
path=None,
payload=None,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
if "value" not in result:
raise AzureResponseError("Value is not present in body: {}".format(result))
users.extend(
[_utility.get_object(user, MODEL, api_version) for user in result["value"]]
)
url = result.get("nextLink", None)
pages_processed = pages_processed + 1
return users
def get_user(
cmd,
app_id: str,
token: str,
assignee: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> UserGa:
"""
Get information for the specified user.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
assignee: unique ID of the user
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
users: dict
"""
api_version = API_VERSION
result = _make_call(
cmd,
app_id=app_id,
method="get",
path=assignee,
payload=None,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
return _utility.get_object(result, MODEL, api_version)
def delete_user(
cmd,
app_id: str,
token: str,
assignee: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
"""
delete user from theapp.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
assignee: unique ID of the user
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
users: dict
"""
api_version = API_VERSION
result = _make_call(
cmd,
app_id=app_id,
method="delete",
path=assignee,
payload=None,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
return result