azext_iot/operations/dps.py (1,069 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from knack.log import get_logger
from azure.cli.core.azclierror import (
ArgumentUsageError,
AzureResponseError,
BadRequestError,
InvalidArgumentValueError,
MutuallyExclusiveArgumentError,
RequiredArgumentMissingError,
ResourceNotFoundError,
)
from azext_iot.common._azure import IOT_SERVICE_CS_TEMPLATE
from azext_iot.common.shared import (
SdkType,
AttestationType,
ReprovisionType,
AllocationType,
KeyType,
IoTDPSStateType
)
from azext_iot.common.utility import compute_device_key, handle_service_exception, shell_safe_json_parse
from azext_iot.common.certops import open_certificate
from azext_iot.dps.providers.discovery import DPSDiscovery
from azext_iot.operations.generic import _execute_query
from azext_iot._factory import SdkResolver
from azext_iot.sdk.dps.service.models import (
IndividualEnrollment,
CustomAllocationDefinition,
AttestationMechanism,
TpmAttestation,
SymmetricKeyAttestation,
X509Attestation,
X509Certificates,
X509CertificateWithInfo,
InitialTwin,
TwinCollection,
InitialTwinProperties,
EnrollmentGroup,
X509CAReferences,
ReprovisionPolicy,
DeviceCapabilities,
ProvisioningServiceErrorDetailsException,
)
logger = get_logger(__name__)
# DPS Enrollments
def iot_dps_device_enrollment_list(
cmd,
dps_name=None,
resource_group_name=None,
top=None,
login=None,
auth_type_dataplane=None,
):
from azext_iot.sdk.dps.service.models import QuerySpecification
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
query_command = "SELECT *"
query = [QuerySpecification(query=query_command)]
return _execute_query(query, sdk.individual_enrollment.query, top)
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def iot_dps_device_enrollment_get(
cmd,
enrollment_id,
dps_name=None,
resource_group_name=None,
show_keys=None,
login=None,
auth_type_dataplane=None,
):
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
enrollment = sdk.individual_enrollment.get(
enrollment_id, raw=True
).response.json()
if show_keys:
enrollment_type = enrollment["attestation"]["type"]
if enrollment_type == AttestationType.symmetricKey.value:
attestation = sdk.individual_enrollment.get_attestation_mechanism(
enrollment_id, raw=True
).response.json()
enrollment["attestation"] = attestation
else:
logger.warning(
"--show-keys argument was provided, but requested enrollment has an attestation type of '{}'."
" Currently, --show-keys is only supported for symmetric key enrollments".format(
enrollment_type
)
)
return enrollment
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def iot_dps_device_enrollment_create(
cmd,
enrollment_id,
attestation_type,
dps_name=None,
resource_group_name=None,
endorsement_key=None,
certificate_path=None,
secondary_certificate_path=None,
primary_key=None,
secondary_key=None,
device_id=None,
iot_hub_host_name=None,
initial_twin_tags=None,
initial_twin_properties=None,
provisioning_status=None,
reprovision_policy=None,
allocation_policy=None,
iot_hubs=None,
edge_enabled=False,
webhook_url=None,
device_information=None,
api_version=None,
login=None,
auth_type_dataplane=None,
):
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
if attestation_type == AttestationType.tpm.value:
if not endorsement_key:
raise RequiredArgumentMissingError("Endorsement key [--endorsement-key] is required")
attestation = AttestationMechanism(
type=AttestationType.tpm.value,
tpm=TpmAttestation(endorsement_key=endorsement_key),
)
if attestation_type == AttestationType.x509.value:
attestation = _get_attestation_with_x509_client_cert(
certificate_path, secondary_certificate_path
)
if attestation_type == AttestationType.symmetricKey.value:
attestation = AttestationMechanism(
type=AttestationType.symmetricKey.value,
symmetric_key=SymmetricKeyAttestation(
primary_key=primary_key, secondary_key=secondary_key
),
)
reprovision = _get_reprovision_policy(reprovision_policy)
initial_twin = _get_initial_twin(initial_twin_tags, initial_twin_properties)
iot_hub_list = iot_hubs.split() if isinstance(iot_hubs, str) else iot_hubs
_validate_allocation_policy_for_enrollment(
allocation_policy, iot_hub_host_name, iot_hub_list, webhook_url, api_version
)
if iot_hub_host_name and allocation_policy is None:
allocation_policy = AllocationType.static.value
iot_hub_list = iot_hub_host_name.split()
custom_allocation_definition = (
CustomAllocationDefinition(webhook_url=webhook_url, api_version=api_version)
if allocation_policy == AllocationType.custom.value
else None
)
capabilities = DeviceCapabilities(iot_edge=edge_enabled)
enrollment = IndividualEnrollment(
registration_id=enrollment_id,
attestation=attestation,
capabilities=capabilities,
device_id=device_id,
initial_twin=initial_twin,
provisioning_status=provisioning_status,
reprovision_policy=reprovision,
allocation_policy=allocation_policy,
iot_hubs=iot_hub_list,
custom_allocation_definition=custom_allocation_definition,
optional_device_information=_get_twin_collection(device_information)
)
return sdk.individual_enrollment.create_or_update(enrollment_id, enrollment)
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def iot_dps_device_enrollment_update(
cmd,
enrollment_id,
dps_name=None,
resource_group_name=None,
etag=None,
endorsement_key=None,
certificate_path=None,
secondary_certificate_path=None,
remove_certificate=None,
remove_secondary_certificate=None,
primary_key=None,
secondary_key=None,
device_id=None,
iot_hub_host_name=None,
initial_twin_tags=None,
initial_twin_properties=None,
provisioning_status=None,
reprovision_policy=None,
allocation_policy=None,
iot_hubs=None,
edge_enabled=None,
webhook_url=None,
device_information=None,
api_version=None,
login=None,
auth_type_dataplane=None,
):
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
enrollment_record = sdk.individual_enrollment.get(enrollment_id)
# Verify and update attestation information
attestation_type = enrollment_record.attestation.type
_validate_arguments_for_attestation_mechanism(
attestation_type,
endorsement_key,
certificate_path,
secondary_certificate_path,
remove_certificate,
remove_secondary_certificate,
primary_key,
secondary_key,
)
if attestation_type == AttestationType.tpm.value:
if endorsement_key:
enrollment_record.attestation.tpm.endorsement_key = endorsement_key
elif attestation_type == AttestationType.x509.value:
enrollment_record.attestation = _get_updated_attestation_with_x509_client_cert(
enrollment_record.attestation,
certificate_path,
secondary_certificate_path,
remove_certificate,
remove_secondary_certificate,
)
else:
enrollment_record.attestation = sdk.individual_enrollment.get_attestation_mechanism(
enrollment_id
)
if primary_key:
enrollment_record.attestation.symmetric_key.primary_key = primary_key
if secondary_key:
enrollment_record.attestation.symmetric_key.secondary_key = (
secondary_key
)
# Update enrollment information
if iot_hub_host_name:
enrollment_record.allocation_policy = AllocationType.static.value
enrollment_record.iot_hubs = iot_hub_host_name.split()
enrollment_record.iot_hub_host_name = None
if device_id:
enrollment_record.device_id = device_id
if provisioning_status:
enrollment_record.provisioning_status = provisioning_status
enrollment_record.registrationState = None
if reprovision_policy:
enrollment_record.reprovision_policy = _get_reprovision_policy(
reprovision_policy
)
enrollment_record.initial_twin = _get_updated_inital_twin(
enrollment_record, initial_twin_tags, initial_twin_properties
)
iot_hub_list = iot_hubs.split() if isinstance(iot_hubs, str) else iot_hubs
_validate_allocation_policy_for_enrollment(
allocation_policy,
iot_hub_host_name,
iot_hub_list,
webhook_url,
api_version,
current_enrollment=enrollment_record
)
if iot_hub_list:
enrollment_record.iot_hubs = iot_hub_list
enrollment_record.iot_hub_host_name = None
if allocation_policy:
enrollment_record.allocation_policy = allocation_policy
if enrollment_record.allocation_policy == AllocationType.custom.value and any([
webhook_url, api_version
]):
enrollment_record.custom_allocation_definition = CustomAllocationDefinition(
webhook_url=webhook_url or enrollment_record.custom_allocation_definition.webhook_url,
api_version=api_version or enrollment_record.custom_allocation_definition.api_version
)
if edge_enabled is not None:
enrollment_record.capabilities = DeviceCapabilities(iot_edge=edge_enabled)
if device_information:
enrollment_record.optional_device_information = _get_twin_collection(device_information)
return sdk.individual_enrollment.create_or_update(
enrollment_id, enrollment_record, if_match=(etag if etag else "*")
)
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def iot_dps_device_enrollment_delete(
cmd,
enrollment_id,
dps_name=None,
resource_group_name=None,
etag=None,
login=None,
auth_type_dataplane=None,
):
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
return sdk.individual_enrollment.delete(enrollment_id, if_match=(etag if etag else "*"))
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
# DPS Enrollments Group
def iot_dps_device_enrollment_group_list(
cmd, dps_name=None, resource_group_name=None, top=None, login=None, auth_type_dataplane=None,
):
from azext_iot.sdk.dps.service.models import QuerySpecification
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
query_command = "SELECT *"
query1 = [QuerySpecification(query=query_command)]
return _execute_query(query1, sdk.enrollment_group.query, top)
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def iot_dps_device_enrollment_group_get(
cmd,
enrollment_id,
dps_name=None,
resource_group_name=None,
show_keys=None,
login=None,
auth_type_dataplane=None,
):
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
enrollment_group = sdk.enrollment_group.get(
enrollment_id, raw=True
).response.json()
if show_keys:
enrollment_type = enrollment_group["attestation"]["type"]
if enrollment_type == AttestationType.symmetricKey.value:
attestation = sdk.enrollment_group.get_attestation_mechanism(
enrollment_id, raw=True
).response.json()
enrollment_group["attestation"] = attestation
else:
logger.warning(
"--show-keys argument was provided, but requested enrollment group has an attestation type of '{}'."
" Currently, --show-keys is only supported for symmetric key enrollment groups".format(
enrollment_type
)
)
return enrollment_group
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def iot_dps_device_enrollment_group_create(
cmd,
enrollment_id,
dps_name=None,
resource_group_name=None,
certificate_path=None,
secondary_certificate_path=None,
root_ca_name=None,
secondary_root_ca_name=None,
primary_key=None,
secondary_key=None,
iot_hub_host_name=None,
initial_twin_tags=None,
initial_twin_properties=None,
provisioning_status=None,
reprovision_policy=None,
allocation_policy=None,
iot_hubs=None,
edge_enabled=False,
webhook_url=None,
api_version=None,
login=None,
auth_type_dataplane=None,
):
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
if not certificate_path and not secondary_certificate_path:
if not root_ca_name and not secondary_root_ca_name:
attestation = AttestationMechanism(
type=AttestationType.symmetricKey.value,
symmetric_key=SymmetricKeyAttestation(
primary_key=primary_key, secondary_key=secondary_key
),
)
if certificate_path or secondary_certificate_path:
if root_ca_name or secondary_root_ca_name:
raise MutuallyExclusiveArgumentError(
"Please provide either certificate path or certficate name"
)
attestation = _get_attestation_with_x509_signing_cert(
certificate_path, secondary_certificate_path
)
if root_ca_name or secondary_root_ca_name:
if certificate_path or secondary_certificate_path:
raise MutuallyExclusiveArgumentError(
"Please provide either certificate path or certficate name"
)
attestation = _get_attestation_with_x509_ca_cert(
root_ca_name, secondary_root_ca_name
)
reprovision = _get_reprovision_policy(reprovision_policy)
initial_twin = _get_initial_twin(initial_twin_tags, initial_twin_properties)
iot_hub_list = iot_hubs.split() if isinstance(iot_hubs, str) else iot_hubs
_validate_allocation_policy_for_enrollment(
allocation_policy, iot_hub_host_name, iot_hub_list, webhook_url, api_version
)
if iot_hub_host_name and allocation_policy is None:
allocation_policy = AllocationType.static.value
iot_hub_list = iot_hub_host_name.split()
custom_allocation_definition = (
CustomAllocationDefinition(webhook_url=webhook_url, api_version=api_version)
if allocation_policy == AllocationType.custom.value
else None
)
capabilities = DeviceCapabilities(iot_edge=edge_enabled)
group_enrollment = EnrollmentGroup(
enrollment_group_id=enrollment_id,
attestation=attestation,
capabilities=capabilities,
initial_twin=initial_twin,
provisioning_status=provisioning_status,
reprovision_policy=reprovision,
allocation_policy=allocation_policy,
iot_hubs=iot_hub_list,
custom_allocation_definition=custom_allocation_definition,
)
return sdk.enrollment_group.create_or_update(enrollment_id, group_enrollment)
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def iot_dps_device_enrollment_group_update(
cmd,
enrollment_id,
dps_name=None,
resource_group_name=None,
etag=None,
certificate_path=None,
secondary_certificate_path=None,
root_ca_name=None,
secondary_root_ca_name=None,
remove_certificate=None,
remove_secondary_certificate=None,
primary_key=None,
secondary_key=None,
iot_hub_host_name=None,
initial_twin_tags=None,
initial_twin_properties=None,
provisioning_status=None,
reprovision_policy=None,
allocation_policy=None,
iot_hubs=None,
edge_enabled=None,
webhook_url=None,
api_version=None,
login=None,
auth_type_dataplane=None,
):
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
enrollment_record = sdk.enrollment_group.get(enrollment_id)
# Update enrollment information
if enrollment_record.attestation.type == AttestationType.symmetricKey.value:
enrollment_record.attestation = sdk.enrollment_group.get_attestation_mechanism(
enrollment_id
)
if primary_key:
enrollment_record.attestation.symmetric_key.primary_key = primary_key
if secondary_key:
enrollment_record.attestation.symmetric_key.secondary_key = (
secondary_key
)
if enrollment_record.attestation.type == AttestationType.x509.value:
if not certificate_path and not secondary_certificate_path:
if not root_ca_name and not secondary_root_ca_name:
# Check if certificate can be safely removed while no new certificate has been provided
if remove_certificate and remove_secondary_certificate:
raise RequiredArgumentMissingError("Please provide at least one certificate")
if not _can_remove_primary_certificate(
remove_certificate, enrollment_record.attestation
):
raise RequiredArgumentMissingError(
"Please provide at least one certificate while removing the only primary certificate"
)
if not _can_remove_secondary_certificate(
remove_secondary_certificate, enrollment_record.attestation
):
raise RequiredArgumentMissingError(
"Please provide at least one certificate while removing the only secondary certificate"
)
if certificate_path or secondary_certificate_path:
if root_ca_name or secondary_root_ca_name:
raise MutuallyExclusiveArgumentError(
"Please provide either certificate path or certficate name"
)
enrollment_record.attestation = _get_updated_attestation_with_x509_signing_cert(
enrollment_record.attestation,
certificate_path,
secondary_certificate_path,
remove_certificate,
remove_secondary_certificate,
)
if root_ca_name or secondary_root_ca_name:
if certificate_path or secondary_certificate_path:
raise MutuallyExclusiveArgumentError(
"Please provide either certificate path or certficate name"
)
enrollment_record.attestation = _get_updated_attestation_with_x509_ca_cert(
enrollment_record.attestation,
root_ca_name,
secondary_root_ca_name,
remove_certificate,
remove_secondary_certificate,
)
if iot_hub_host_name:
enrollment_record.allocation_policy = AllocationType.static.value
enrollment_record.iot_hubs = iot_hub_host_name.split()
enrollment_record.iot_hub_host_name = None
if provisioning_status:
enrollment_record.provisioning_status = provisioning_status
if reprovision_policy:
enrollment_record.reprovision_policy = _get_reprovision_policy(
reprovision_policy
)
enrollment_record.initial_twin = _get_updated_inital_twin(
enrollment_record, initial_twin_tags, initial_twin_properties
)
iot_hub_list = iot_hubs.split() if isinstance(iot_hubs, str) else iot_hubs
_validate_allocation_policy_for_enrollment(
allocation_policy,
iot_hub_host_name,
iot_hub_list,
webhook_url,
api_version,
current_enrollment=enrollment_record
)
if iot_hub_list:
enrollment_record.iot_hubs = iot_hub_list
enrollment_record.iot_hub_host_name = None
if allocation_policy:
enrollment_record.allocation_policy = allocation_policy
if enrollment_record.allocation_policy == AllocationType.custom.value and any([
webhook_url, api_version
]):
enrollment_record.custom_allocation_definition = CustomAllocationDefinition(
webhook_url=webhook_url or enrollment_record.custom_allocation_definition.webhook_url,
api_version=api_version or enrollment_record.custom_allocation_definition.api_version
)
if edge_enabled is not None:
enrollment_record.capabilities = DeviceCapabilities(iot_edge=edge_enabled)
return sdk.enrollment_group.create_or_update(
enrollment_id, enrollment_record, if_match=(etag if etag else "*")
)
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def iot_dps_device_enrollment_group_delete(
cmd,
enrollment_id,
dps_name=None,
resource_group_name=None,
etag=None,
login=None,
auth_type_dataplane=None,
):
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
return sdk.enrollment_group.delete(enrollment_id, if_match=(etag if etag else "*"))
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def iot_dps_compute_device_key(
cmd,
registration_id,
enrollment_id=None,
dps_name=None,
resource_group_name=None,
symmetric_key=None,
login=None,
auth_type_dataplane=None,
):
if symmetric_key is None:
if not all([dps_name, enrollment_id]):
raise RequiredArgumentMissingError(
"Please provide DPS enrollment group identifiers (Device Provisioning Service name via "
"--dps-name and Enrollment ID via --enrollment-id) or the enrollment group symmetric key "
"via --symmetric-key or --key."
)
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
attestation = sdk.enrollment_group.get_attestation_mechanism(
enrollment_id, raw=True
).response.json()
if attestation.get("type") != AttestationType.symmetricKey.value:
raise BadRequestError(
"Requested enrollment group has an attestation type of '{}'. Currently, compute-device-key "
"is only supported for enrollment groups with symmetric key attestation type.".format(
attestation.get("type")
)
)
symmetric_key = attestation["symmetricKey"]["primaryKey"]
except ProvisioningServiceErrorDetailsException as e:
raise AzureResponseError(e)
return compute_device_key(
primary_key=symmetric_key, registration_id=registration_id
)
# DPS Connection strings
def iot_dps_connection_string_show(
cmd,
dps_name=None,
resource_group_name=None,
policy_name="provisioningserviceowner",
key_type=KeyType.primary.value,
show_all=False,
):
discovery = DPSDiscovery(cmd)
if dps_name is None:
dps = discovery.get_resources(resource_group_name)
if dps is None:
raise ResourceNotFoundError("No Device Provisioning Service found.")
def conn_str_getter(dps):
return _get_dps_connection_string(
discovery, dps, policy_name, key_type, show_all
)
connection_strings = []
for dps in dps:
if dps.properties.state == IoTDPSStateType.Active.value:
try:
connection_strings.append(
{
"name": dps.name,
"connectionString": conn_str_getter(dps)
if show_all
else conn_str_getter(dps)[0],
}
)
except Exception:
logger.warning(
f"Warning: The DPS {dps.name} in resource group "
+ f"{dps.additional_properties['resourcegroup']} does "
+ f"not have the target policy {policy_name}."
)
else:
logger.warning(
f"Warning: The DPS {dps.name} in resource group "
+ f"{dps.additional_properties['resourcegroup']} is skipped "
+ "because the DPS is not active."
)
return connection_strings
dps = discovery.find_resource(dps_name, resource_group_name)
if dps:
conn_str = _get_dps_connection_string(
discovery, dps, policy_name, key_type, show_all
)
return {"connectionString": conn_str if show_all else conn_str[0]}
def _get_dps_connection_string(
discovery, dps, policy_name, key_type, show_all
):
policies = []
if show_all:
policies.extend(
discovery.get_policies(dps.name, dps.additional_properties["resourcegroup"])
)
else:
policies.append(
discovery.find_policy(
dps.name, dps.additional_properties["resourcegroup"], policy_name
)
)
hostname = dps.properties.service_operations_host_name
return [
IOT_SERVICE_CS_TEMPLATE.format(
hostname,
p.key_name,
p.secondary_key if key_type == KeyType.secondary.value else p.primary_key,
)
for p in policies
]
# DPS Registration
def iot_dps_registration_list(
cmd,
enrollment_id,
dps_name=None,
resource_group_name=None,
top=None,
login=None,
auth_type_dataplane=None,
):
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
return _execute_query([enrollment_id], sdk.device_registration_state.query, top)
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def iot_dps_registration_get(
cmd, registration_id, dps_name=None, resource_group_name=None, login=None, auth_type_dataplane=None,
):
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
return sdk.device_registration_state.get(
registration_id, raw=True
).response.json()
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def iot_dps_registration_delete(
cmd,
registration_id,
dps_name=None,
resource_group_name=None,
etag=None,
login=None,
auth_type_dataplane=None,
):
discovery = DPSDiscovery(cmd)
target = discovery.get_target(
dps_name,
resource_group_name,
login=login,
auth_type=auth_type_dataplane,
)
try:
resolver = SdkResolver(target=target)
sdk = resolver.get_sdk(SdkType.dps_sdk)
return sdk.device_registration_state.delete(registration_id, if_match=(etag if etag else "*"))
except ProvisioningServiceErrorDetailsException as e:
handle_service_exception(e)
def _get_twin_collection(properties):
"""Convert a json into TwinCollection for use with the API."""
from azext_iot.common.utility import dict_clean
if properties == "":
properties = None
elif properties:
properties = dict_clean(shell_safe_json_parse(str(properties)))
return TwinCollection(additional_properties=properties)
def _get_initial_twin(initial_twin_tags=None, initial_twin_properties=None):
"""Build up Inital Twin using given tags and properties."""
return InitialTwin(
tags=_get_twin_collection(initial_twin_tags),
properties=InitialTwinProperties(
desired=_get_twin_collection(initial_twin_properties)
),
)
def _get_updated_inital_twin(
enrollment_record, initial_twin_tags=None, initial_twin_properties=None
):
if initial_twin_properties != "" and not initial_twin_tags:
if hasattr(enrollment_record, "initial_twin"):
if hasattr(enrollment_record.initial_twin, "tags"):
initial_twin_tags = enrollment_record.initial_twin.tags
if initial_twin_properties != "" and not initial_twin_properties:
if hasattr(enrollment_record, "initial_twin"):
if hasattr(enrollment_record.initial_twin, "properties"):
if hasattr(enrollment_record.initial_twin.properties, "desired"):
initial_twin_properties = (
enrollment_record.initial_twin.properties.desired
)
return _get_initial_twin(initial_twin_tags, initial_twin_properties)
def _get_x509_certificate(certificate_path, secondary_certificate_path):
x509certificate = X509Certificates(
primary=_get_certificate_info(certificate_path),
secondary=_get_certificate_info(secondary_certificate_path),
)
return x509certificate
def _get_certificate_info(certificate_path):
if not certificate_path:
return None
certificate_content = open_certificate(certificate_path)
certificate_with_info = X509CertificateWithInfo(certificate=certificate_content)
return certificate_with_info
def _get_attestation_with_x509_client_cert(
primary_certificate_path, secondary_certificate_path
):
if not primary_certificate_path and not secondary_certificate_path:
raise RequiredArgumentMissingError("Please provide at least one certificate path")
certificate = _get_x509_certificate(
primary_certificate_path, secondary_certificate_path
)
x509Attestation = X509Attestation(client_certificates=certificate)
attestation = AttestationMechanism(
type=AttestationType.x509.value, x509=x509Attestation
)
return attestation
def _get_updated_attestation_with_x509_client_cert(
attestation,
primary_certificate_path,
secondary_certificate_path,
remove_primary_certificate,
remove_secondary_certificate,
):
if remove_primary_certificate:
attestation.x509.client_certificates.primary = None
if remove_secondary_certificate:
attestation.x509.client_certificates.secondary = None
if primary_certificate_path:
attestation.x509.client_certificates.primary = _get_certificate_info(
primary_certificate_path
)
if secondary_certificate_path:
attestation.x509.client_certificates.secondary = _get_certificate_info(
secondary_certificate_path
)
return attestation
def _get_attestation_with_x509_signing_cert(
primary_certificate_path, secondary_certificate_path
):
certificate = _get_x509_certificate(
primary_certificate_path, secondary_certificate_path
)
x509Attestation = X509Attestation(signing_certificates=certificate)
attestation = AttestationMechanism(
type=AttestationType.x509.value, x509=x509Attestation
)
return attestation
def _get_attestation_with_x509_ca_cert(root_ca_name, secondary_root_ca_name):
certificate = X509CAReferences(
primary=root_ca_name, secondary=secondary_root_ca_name
)
x509Attestation = X509Attestation(ca_references=certificate)
attestation = AttestationMechanism(
type=AttestationType.x509.value, x509=x509Attestation
)
return attestation
def _get_updated_attestation_with_x509_signing_cert(
attestation,
primary_certificate_path,
secondary_certificate_path,
remove_primary_certificate,
remove_secondary_certificate,
):
if hasattr(attestation.x509, "signing_certificates"):
if remove_primary_certificate:
attestation.x509.signing_certificates.primary = None
if remove_secondary_certificate:
attestation.x509.signing_certificates.secondary = None
if primary_certificate_path:
attestation.x509.signing_certificates.primary = _get_certificate_info(
primary_certificate_path
)
if secondary_certificate_path:
attestation.x509.signing_certificates.secondary = _get_certificate_info(
secondary_certificate_path
)
return attestation
return _get_attestation_with_x509_signing_cert(
primary_certificate_path, secondary_certificate_path
)
def _get_updated_attestation_with_x509_ca_cert(
attestation,
root_ca_name,
secondary_root_ca_name,
remove_primary_certificate,
remove_secondary_certificate,
):
if (
hasattr(attestation.x509, "ca_references")
and attestation.x509.ca_references is not None
):
if remove_primary_certificate:
attestation.x509.ca_references.primary = None
if remove_secondary_certificate:
attestation.x509.ca_references.secondary = None
if root_ca_name:
attestation.x509.ca_references.primary = root_ca_name
if secondary_root_ca_name:
attestation.x509.ca_references.secondary = secondary_root_ca_name
return attestation
return _get_attestation_with_x509_ca_cert(root_ca_name, secondary_root_ca_name)
def _can_remove_primary_certificate(remove_certificate, attestation):
if remove_certificate:
if hasattr(attestation.x509, "signing_certificates"):
if (
not hasattr(attestation.x509.signing_certificates, "secondary")
or not attestation.x509.signing_certificates.secondary
):
return False
if hasattr(attestation.x509, "ca_references"):
if (
not hasattr(attestation.x509.ca_references, "secondary")
or not attestation.x509.ca_references.secondary
):
return False
return True
def _can_remove_secondary_certificate(remove_certificate, attestation):
if remove_certificate:
if hasattr(attestation.x509, "signing_certificates"):
if (
not hasattr(attestation.x509.signing_certificates, "primary")
or not attestation.x509.signing_certificates.primary
):
return False
if hasattr(attestation.x509, "ca_references"):
if (
not hasattr(attestation.x509.ca_references, "primary")
or not attestation.x509.ca_references.primary
):
return False
return True
def _get_reprovision_policy(reprovision_policy):
if reprovision_policy:
if reprovision_policy == ReprovisionType.reprovisionandmigratedata.value:
reprovision = ReprovisionPolicy(
update_hub_assignment=True, migrate_device_data=True
)
elif reprovision_policy == ReprovisionType.reprovisionandresetdata.value:
reprovision = ReprovisionPolicy(
update_hub_assignment=True, migrate_device_data=False
)
elif reprovision_policy == ReprovisionType.never.value:
reprovision = ReprovisionPolicy(
update_hub_assignment=False, migrate_device_data=False
)
else:
raise InvalidArgumentValueError("Invalid Reprovision Policy.")
else:
reprovision = ReprovisionPolicy(
update_hub_assignment=True, migrate_device_data=True
)
return reprovision
def _validate_arguments_for_attestation_mechanism(
attestation_type,
endorsement_key,
certificate_path,
secondary_certificate_path,
remove_certificate,
remove_secondary_certificate,
primary_key,
secondary_key,
):
if attestation_type == AttestationType.tpm.value:
if certificate_path or secondary_certificate_path:
raise ArgumentUsageError(
"Cannot update certificate while enrollment is using tpm attestation mechanism"
)
if remove_certificate or remove_secondary_certificate:
raise ArgumentUsageError(
"Cannot remove certificate while enrollment is using tpm attestation mechanism"
)
if primary_key or secondary_key:
raise ArgumentUsageError(
"Cannot update primary or secondary key while enrollment is using tpm attestation mechanism"
)
elif attestation_type == AttestationType.x509.value:
if endorsement_key:
raise ArgumentUsageError(
"Cannot update endorsement key while enrollment is using x509 attestation mechanism"
)
if primary_key or secondary_key:
raise ArgumentUsageError(
"Cannot update primary or secondary key while enrollment is using x509 attestation mechanism"
)
else:
if certificate_path or secondary_certificate_path:
raise ArgumentUsageError(
"Cannot update certificate while enrollment is using symmetric key attestation mechanism"
)
if remove_certificate or remove_secondary_certificate:
raise ArgumentUsageError(
"Cannot remove certificate while enrollment is using symmetric key attestation mechanism"
)
if endorsement_key:
raise ArgumentUsageError(
"Cannot update endorsement key while enrollment is using symmetric key attestation mechanism"
)
def _validate_allocation_policy_for_enrollment(
allocation_policy, iot_hub_host_name, iot_hub_list, webhook_url, api_version, current_enrollment=None
):
# get the enrollment values if not provided but present
if current_enrollment:
iot_hub_list = iot_hub_list or current_enrollment.iot_hubs
allocation_policy = allocation_policy or current_enrollment.allocation_policy
if current_enrollment.allocation_policy == AllocationType.custom.value:
webhook_url = webhook_url or current_enrollment.custom_allocation_definition.webhook_url
api_version = api_version or current_enrollment.custom_allocation_definition.api_version
if allocation_policy:
if iot_hub_host_name is not None:
raise MutuallyExclusiveArgumentError(
"'iot_hub_host_name' is not required when allocation-policy is defined."
)
# Code to ensure geolatency still works after the enum fix.
if not any(
allocation_policy == allocation.value for allocation in AllocationType
):
raise RequiredArgumentMissingError("Please provide valid allocation policy.")
if allocation_policy == AllocationType.static.value:
if iot_hub_list is None:
raise RequiredArgumentMissingError("Please provide a hub to be assigned with device.")
if iot_hub_list and len(iot_hub_list) > 1:
raise InvalidArgumentValueError("Only one hub is required in static allocation policy.")
if allocation_policy == AllocationType.custom.value:
if webhook_url is None or api_version is None:
raise RequiredArgumentMissingError(
"Please provide both the Azure function webhook url and provisioning"
" service api-version when the allocation-policy is defined as Custom."
)
elif iot_hub_list and not current_enrollment:
raise RequiredArgumentMissingError("Please provide allocation policy.")