azext_iot/central/services/enrollment_group.py (271 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import List
import requests
from knack.log import get_logger
from azure.cli.core.azclierror import AzureResponseError
from azext_iot.constants import CENTRAL_ENDPOINT
from azext_iot.central.services import _utility
from azext_iot.central.models.ga_2022_07_31 import EnrollmentGroupGa
from azext_iot.central.common import API_VERSION
logger = get_logger(__name__)
BASE_PATH = "api/enrollmentGroups"
MODEL = "EnrollmentGroup"
def list_enrollment_groups(
cmd,
app_id: str,
token: str,
api_version=API_VERSION,
max_pages=0,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> List[EnrollmentGroupGa]:
"""
Get a list of all enrollment groups.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
list of enrollment groups
"""
api_version = API_VERSION
enrollment_groups = []
url = "https://{}.{}/{}".format(app_id, central_dns_suffix, BASE_PATH)
headers = _utility.get_headers(token, cmd)
# Construct parameters
query_parameters = {}
query_parameters["api-version"] = api_version
pages_processed = 0
while (max_pages == 0 or pages_processed < max_pages) and url:
response = requests.get(url, headers=headers, params=query_parameters)
result = _utility.try_extract_result(response)
if "value" not in result:
raise AzureResponseError("Value is not present in body: {}".format(result))
for enrollment_group in result["value"]:
enrollment_groups.append(EnrollmentGroupGa(enrollment_group))
url = result.get("nextLink", None)
pages_processed = pages_processed + 1
return enrollment_groups
def get_enrollment_group(
cmd,
app_id: str,
group_id: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> EnrollmentGroupGa:
"""
Get a specific enrollment group.
Args:
cmd: command passed into az
group_id: case sensitive enrollment group id,
app_id: name of app (used for forming request URL)
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
enrollment_group: dict
"""
api_version = API_VERSION
result = _utility.make_api_call(
cmd,
app_id=app_id,
method="GET",
url="https://{}.{}/{}/{}".format(app_id, central_dns_suffix, BASE_PATH, group_id),
payload=None,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)
return _utility.get_object(result, model=MODEL, api_version=api_version)
def create_enrollment_group(
cmd,
app_id: str,
attestation: str,
primary_key: str,
secondary_key: str,
display_name: str,
type: str,
token: str,
group_id: str,
enabled: bool = True,
etag: str = None,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> EnrollmentGroupGa:
"""
Creates a enrollment group.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
group_id: case sensitive enrollment group id
display_name: Display name of the enrollment group
attestation: The attestation mechanism for the enrollment group
type: Type of devices that connect through the group
enabled : Whether the devices using the group are allowed to connect to IoT Central
etag: ETag used to prevent conflict in enrollment group updates
token: (OPTIONAL) authorization token to fetch enrollment details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
enrollment_group: dict
"""
api_version = API_VERSION
if attestation is None:
attestation = 'symmetricKey'
attestation_payload = {
"type": attestation
}
if attestation == 'symmetricKey' and (primary_key or secondary_key):
attestation_payload['symmetricKey'] = {}
attestation_payload['symmetricKey']['primaryKey'] = primary_key
attestation_payload['symmetricKey']['secondaryKey'] = secondary_key
payload = {
"displayName": display_name,
"attestation": attestation_payload,
"type": type,
}
if enabled is not None:
payload['enabled'] = enabled
if etag is not None:
payload['etag'] = etag
result = _utility.make_api_call(
cmd,
app_id=app_id,
method="PUT",
url="https://{}.{}/{}/{}".format(app_id, central_dns_suffix, BASE_PATH, group_id),
payload=payload,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)
return _utility.get_object(result, model=MODEL, api_version=api_version)
def update_enrollment_group(
cmd,
app_id: str,
display_name: str,
type: str,
token: str,
group_id: str,
enabled: bool = True,
etag : str = None,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> EnrollmentGroupGa:
"""
Updates a enrollment group.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
group_id: case sensitive enrollment group id
display_name: Display name of the enrollment group
attestation: The attestation mechanism for the enrollment group
type: Type of devices that connect through the group
enabled : Whether the devices using the group are allowed to connect to IoT Central
etag: ETag used to prevent conflict in enrollment group updates
token: (OPTIONAL) authorization token to fetch enrollment details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
enrollment_group: dict
"""
api_version = API_VERSION
payload = {}
if display_name is not None:
payload["displayName"] = display_name
if enabled is not None:
payload['enabled'] = enabled
if etag is not None:
payload['etag'] = etag
if type is not None:
payload["type"] = type
result = _utility.make_api_call(
cmd,
app_id=app_id,
method="PATCH",
url="https://{}.{}/{}/{}".format(app_id, central_dns_suffix, BASE_PATH, group_id),
payload=payload,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)
return _utility.get_object(result, MODEL, api_version)
def delete_enrollment_group(
cmd,
app_id: str,
group_id: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
"""
Delete a enrollment group.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
group_id: case sensitive enrollment group id,
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
enrollment_group: dict
"""
api_version = API_VERSION
return _utility.make_api_call(
cmd,
app_id=app_id,
method="DELETE",
url="https://{}.{}/{}/{}".format(app_id, central_dns_suffix, BASE_PATH, group_id),
payload=None,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)
def create_x509(
cmd,
app_id: str,
group_id: str,
primary_cert: str,
secondary_cert: str,
etag: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
"""
Sets the primary or secondary x509 certificate of an enrollment group.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
group_id: case sensitive enrollment group id
entry: entry of certificate only support primary and secondary
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
dict
"""
api_version = API_VERSION
entry = 'primary' if primary_cert is not None else 'secondary'
payload = {
"certificate": primary_cert if primary_cert is not None else secondary_cert,
}
if etag is not None:
payload['etag'] = etag
return _utility.make_api_call(
cmd,
app_id=app_id,
method="PUT",
url="https://{}.{}/{}/{}/certificates/{}".format(
app_id, central_dns_suffix, BASE_PATH, group_id, entry),
payload=payload,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)
def get_x509(
cmd,
app_id: str,
group_id: str,
certificate_entry: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
"""
Get the primary or secondary x509 certificate of an enrollment group.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
group_id: case sensitive enrollment group id
entry: entry of certificate only support primary and secondary
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
dict
"""
api_version = API_VERSION
return _utility.make_api_call(
cmd,
app_id=app_id,
method="GET",
url="https://{}.{}/{}/{}/certificates/{}".format(
app_id, central_dns_suffix, BASE_PATH, group_id, certificate_entry),
payload=None,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)
def delete_x509(
cmd,
app_id: str,
group_id: str,
certificate_entry: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
"""
Removes the primary or secondary x509 certificate of an enrollment group.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
group_id: case sensitive enrollment group id
entry: entry of certificate only support primary and secondary
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
dict
"""
api_version = API_VERSION
return _utility.make_api_call(
cmd,
app_id=app_id,
method="DELETE",
url="https://{}.{}/{}/{}/certificates/{}".format(
app_id, central_dns_suffix, BASE_PATH, group_id, certificate_entry),
payload=None,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)
def verify_x509(
cmd,
app_id: str,
group_id: str,
primary_cert: str,
secondary_cert: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
"""
Verify the primary or secondary x509 certificate of an enrollment group
by providing a certificate with the signed verification code.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
group_id: case sensitive enrollment group id
entry: entry of certificate only support primary and secondary
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
dict
"""
api_version = API_VERSION
entry = 'primary' if primary_cert is not None else 'secondary'
payload = {
"certificate": primary_cert if primary_cert is not None else secondary_cert,
}
return _utility.make_api_call(
cmd,
app_id=app_id,
method="POST",
url="https://{}.{}/{}/{}/certificates/{}/verify".format(
app_id, central_dns_suffix, BASE_PATH, group_id, entry),
payload=payload,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)
def generate_verification_code(
cmd,
app_id: str,
group_id: str,
certificate_entry: str,
token: str,
api_version=API_VERSION,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
"""
Generate a verification code for the primary or secondary x509 certificate of
an enrollment group.
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
group_id: case sensitive enrollment group id
entry: entry of certificate only support primary and secondary
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
central_dns_suffix: {centralDnsSuffixInPath} as found in docs
Returns:
dict
"""
api_version = API_VERSION
return _utility.make_api_call(
cmd,
app_id=app_id,
method="POST",
url="https://{}.{}/{}/{}/certificates/{}/generateVerificationCode".format(
app_id, central_dns_suffix, BASE_PATH, group_id, certificate_entry),
payload=None,
token=token,
api_version=api_version,
central_dnx_suffix=central_dns_suffix,
)