azext_iot/central/commands_enrollment_group.py (278 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
# Dev note - think of this as a controller
import base64
from typing import List, Optional
from azext_iot.constants import CENTRAL_ENDPOINT
from azext_iot.central.providers import CentralEnrollmentGroupProvider
from azext_iot.common.certops import open_certificate
from azext_iot.central.common import API_VERSION
from azext_iot.central.models.ga_2022_07_31 import EnrollmentGroupGa
def get_enrollment_group(
cmd,
app_id: str,
group_id: str,
certificate_entry: Optional[str] = None,
token: Optional[str] = None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> EnrollmentGroupGa:
provider = CentralEnrollmentGroupProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
response = provider.get_enrollment_group(
group_id=group_id,
central_dns_suffix=central_dns_suffix,
)
if certificate_entry:
response["x509"] = get_x509(
cmd=cmd,
app_id=app_id,
group_id=group_id,
certificate_entry=certificate_entry,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
return response
def list_enrollment_groups(
cmd,
app_id: str,
token=None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> List[EnrollmentGroupGa]:
provider = CentralEnrollmentGroupProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
return provider.list_enrollment_groups(central_dns_suffix=central_dns_suffix)
def create_enrollment_group(
cmd,
app_id: str,
attestation: str,
display_name: str,
type: str,
group_id: str,
enabled: Optional[str] = 'enabled',
primary_key: Optional[str] = None,
secondary_key: Optional[str] = None,
primary_cert_path: Optional[str] = None,
secondary_cert_path: Optional[str] = None,
etag: Optional[str] = None,
token: Optional[str] = None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> EnrollmentGroupGa:
provider = CentralEnrollmentGroupProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
response = provider.create_enrollment_group(
group_id=group_id,
attestation=attestation,
primary_key=primary_key,
secondary_key=secondary_key,
display_name=display_name,
type=type,
enabled=(enabled == 'enabled'),
etag=etag,
central_dns_suffix=central_dns_suffix,
)
# For x509 we need to call a separate API
primary_cert = None
secondary_cert = None
if attestation == 'x509':
if primary_cert_path:
primary_cert = open_certificate(primary_cert_path)
if primary_cert_path.endswith(".pem"):
primary_cert = base64.encodebytes((primary_cert.replace('\r', '') + '\n').encode()).decode().replace('\n', '')
if secondary_cert_path:
secondary_cert = open_certificate(secondary_cert_path)
if secondary_cert_path.endswith(".pem"):
secondary_cert = base64.encodebytes((secondary_cert.replace('\r', '') + '\n').encode()).decode().replace('\n', '')
if primary_cert_path or secondary_cert_path:
response["x509"] = create_x509(
cmd=cmd,
app_id=app_id,
group_id=group_id,
primary_cert=primary_cert,
secondary_cert=secondary_cert,
etag=etag,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
return response
def update_enrollment_group(
cmd,
app_id: str,
group_id: str,
display_name: Optional[str] = None,
type: Optional[str] = None,
remove_x509: Optional[bool] = None,
enabled: Optional[str] = 'enabled',
primary_cert_path: Optional[str] = None,
secondary_cert_path: Optional[str] = None,
certificate_entry: Optional[str] = None,
etag: Optional[str] = None,
token: Optional[str] = None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
):
provider = CentralEnrollmentGroupProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
response = provider.update_enrollment_group(
group_id=group_id,
display_name=display_name,
type=type,
enabled=(enabled == 'enabled'),
etag=etag,
central_dns_suffix=central_dns_suffix,
)
# Still can create/remove x509 during update
primary_cert = None
secondary_cert = None
if primary_cert_path:
primary_cert = open_certificate(primary_cert_path)
if primary_cert_path.endswith(".pem"):
primary_cert = base64.encodebytes((primary_cert.replace('\r', '') + '\n').encode()).decode().replace('\n', '')
if secondary_cert_path:
secondary_cert = open_certificate(secondary_cert_path)
if secondary_cert_path.endswith(".pem"):
secondary_cert = base64.encodebytes((secondary_cert.replace('\r', '') + '\n').encode()).decode().replace('\n', '')
if primary_cert_path or secondary_cert_path:
response["x509"] = create_x509(
cmd=cmd,
app_id=app_id,
group_id=group_id,
primary_cert=primary_cert,
secondary_cert=secondary_cert,
etag=etag,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
elif remove_x509 is True and certificate_entry:
# We need to remove x509 from the group
response["x509"] = {
"remove": delete_x509(
cmd=cmd,
app_id=app_id,
group_id=group_id,
certificate_entry=certificate_entry,
token=token,
central_dns_suffix=central_dns_suffix,
api_version=api_version,
)
}
return response
def delete_enrollment_group(
cmd,
app_id: str,
group_id: str,
token=None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> dict:
provider = CentralEnrollmentGroupProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
return provider.delete_enrollment_group(
group_id=group_id,
central_dns_suffix=central_dns_suffix,
)
def create_x509(
cmd,
app_id: str,
group_id: str,
primary_cert: Optional[str] = None,
secondary_cert: Optional[str] = None,
etag: Optional[str] = None,
token: Optional[str] = None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> dict:
provider = CentralEnrollmentGroupProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
return provider.create_x509(
group_id=group_id,
primary_cert=primary_cert,
secondary_cert=secondary_cert,
etag=etag,
central_dns_suffix=central_dns_suffix,
)
def verify_x509(
cmd,
app_id: str,
group_id: str,
primary_cert_path: Optional[str] = None,
secondary_cert_path: Optional[str] = None,
token: Optional[str] = None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> dict:
provider = CentralEnrollmentGroupProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
primary_cert = None
secondary_cert = None
if primary_cert_path:
primary_cert = open_certificate(primary_cert_path)
if primary_cert_path.endswith(".pem"):
primary_cert = base64.encodebytes((primary_cert.replace('\r', '') + '\n').encode()).decode().replace('\n', '')
if secondary_cert_path:
secondary_cert = open_certificate(secondary_cert_path)
if secondary_cert_path.endswith(".pem"):
secondary_cert = base64.encodebytes((secondary_cert.replace('\r', '') + '\n').encode()).decode().replace('\n', '')
return provider.verify_x509(
group_id=group_id,
primary_cert=primary_cert,
secondary_cert=secondary_cert,
central_dns_suffix=central_dns_suffix,
)
def get_x509(
cmd,
app_id: str,
group_id: str,
certificate_entry: Optional[str] = None,
token: Optional[str] = None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> dict:
provider = CentralEnrollmentGroupProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
return provider.get_x509(
group_id=group_id,
certificate_entry=certificate_entry,
central_dns_suffix=central_dns_suffix,
)
def delete_x509(
cmd,
app_id: str,
group_id: str,
certificate_entry: Optional[str] = None,
token: Optional[str] = None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> dict:
provider = CentralEnrollmentGroupProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
return provider.delete_x509(
group_id=group_id,
certificate_entry=certificate_entry,
central_dns_suffix=central_dns_suffix,
)
def generate_verification_code(
cmd,
app_id: str,
group_id: str,
certificate_entry: Optional[str] = None,
token: Optional[str] = None,
central_dns_suffix=CENTRAL_ENDPOINT,
api_version=API_VERSION,
) -> dict:
provider = CentralEnrollmentGroupProvider(
cmd=cmd, app_id=app_id, token=token, api_version=api_version
)
return provider.generate_verification_code(
group_id=group_id,
certificate_entry=certificate_entry,
central_dns_suffix=central_dns_suffix,
)