azext_iot/central/commands_device.py (636 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- # Dev note - think of this as a controller from azext_iot.central.common import API_VERSION, EDGE_ONLY_FILTER from azext_iot.central.models.devicetwin import DeviceTwin from azext_iot.central.models.edge import EdgeModule from azext_iot.central.providers import ( CentralDeviceProvider, CentralDeviceTemplateProvider, ) from typing import Optional, List, Any from azure.cli.core.azclierror import ( InvalidArgumentValueError, RequiredArgumentMissingError, ResourceNotFoundError, ForbiddenError, ) from azext_iot.central.models.ga_2022_07_31 import DeviceGa from azext_iot.common import utility from azext_iot.constants import CENTRAL_ENDPOINT from knack.log import get_logger logger = get_logger(__name__) def list_devices( cmd, app_id: str, edge_only=False, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> List[DeviceGa]: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) devices = provider.list_devices( filter=EDGE_ONLY_FILTER if edge_only else None, central_dns_suffix=central_dns_suffix, ) if edge_only: template_provider = CentralDeviceTemplateProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) templates = {} filtered = [] for device in devices: template_id = get_template_id(device) if template_id is None: continue if template_id not in templates: templates[template_id] = template_provider.get_device_template( template_id, central_dns_suffix=central_dns_suffix ) template = templates[template_id] if "EdgeModel" in template.raw_template[template.get_type_key()]: filtered.append(device) return filtered return devices def get_device( cmd, app_id: str, device_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> DeviceGa: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.get_device(device_id, central_dns_suffix=central_dns_suffix) def get_device_twin( cmd, app_id: str, device_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, ) -> DeviceTwin: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=API_VERSION ) return provider.get_device_twin( device_id, central_dns_suffix=central_dns_suffix ).device_twin def create_device( cmd, app_id: str, device_id: str, device_name=None, template=None, simulated=False, organizations=None, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> DeviceGa: if simulated and not template: raise RequiredArgumentMissingError( "Error: if you supply --simulated you must also specify --template" ) provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.create_device( device_id=device_id, device_name=device_name, template=template, simulated=simulated, organizations=organizations, central_dns_suffix=central_dns_suffix, ) def update_device( cmd, app_id: str, device_id: str, device_name=None, template=None, simulated=None, enabled=None, organizations=None, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> DeviceGa: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.update_device( device_id=device_id, device_name=device_name, template=template, simulated=simulated, enabled=enabled, organizations=organizations, central_dns_suffix=central_dns_suffix, ) def delete_device( cmd, app_id: str, device_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.delete_device(device_id, central_dns_suffix=central_dns_suffix) def registration_info( cmd, app_id: str, device_id, token=None, api_version=API_VERSION, central_dns_suffix=CENTRAL_ENDPOINT, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.get_device_registration_info( device_id=device_id, central_dns_suffix=central_dns_suffix, device_status=None, ) def run_command( cmd, app_id: str, device_id: str, command_name: str, content: str, component_name: Optional[str] = None, module_name: Optional[str] = None, interface_id: Optional[str] = None, token: Optional[str] = None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) payload = utility.process_json_arg(content, argument_name="content") return provider.run_command( device_id=device_id, interface_id=interface_id, component_name=component_name, module_name=module_name, command_name=command_name, payload=payload, central_dns_suffix=central_dns_suffix, ) def run_manual_failover( cmd, app_id: str, device_id: str, ttl_minutes=None, token=None, api_version=API_VERSION, central_dns_suffix=CENTRAL_ENDPOINT, ) -> dict: if ttl_minutes and ttl_minutes < 1: raise InvalidArgumentValueError( "TTL value should be a positive integer: {}".format(ttl_minutes) ) provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.run_manual_failover( device_id=device_id, ttl_minutes=ttl_minutes, central_dns_suffix=central_dns_suffix, ) def run_manual_failback( cmd, app_id: str, device_id: str, token=None, api_version=API_VERSION, central_dns_suffix=CENTRAL_ENDPOINT, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.run_manual_failback( device_id=device_id, central_dns_suffix=central_dns_suffix ) def purge_c2d_messages( cmd, app_id: str, device_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=API_VERSION ) return provider.purge_c2d_messages( device_id=device_id, central_dns_suffix=central_dns_suffix ) def get_command_history( cmd, app_id: str, device_id: str, command_name: str, interface_id: Optional[str] = None, component_name: Optional[str] = None, module_name: Optional[str] = None, token: Optional[str] = None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.get_command_history( device_id=device_id, interface_id=interface_id, component_name=component_name, module_name=module_name, command_name=command_name, central_dns_suffix=central_dns_suffix, ) def list_children( cmd, app_id: str, device_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> List[DeviceGa]: children = [] provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) # get iotedge device edge_twin = provider.get_device_twin( device_id=device_id, central_dns_suffix=central_dns_suffix ) edge_scope_id = edge_twin.device_twin.get("deviceScope") # list all application device twins devices = provider.list_devices(central_dns_suffix=central_dns_suffix) for device in devices: try: twin = provider.get_device_twin( device.id, central_dns_suffix=central_dns_suffix ) device_scope_id = twin.device_twin.get("deviceScope") if ( device_scope_id and device_scope_id == edge_scope_id and device.id != device_id # skip current device ): children.append(device) except Exception: pass return children def add_children( cmd, app_id: str, device_id: str, children_ids: List[str], token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ): from uuid import uuid4 provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return [ provider.add_relationship( device_id=device_id, target_id=child_id, rel_id=str(uuid4()), central_dns_suffix=central_dns_suffix, ) for child_id in children_ids ] def remove_children( cmd, app_id: str, device_id: str, children_ids: List[str], token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ): provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) rels = provider.list_relationships( device_id=device_id, central_dns_suffix=central_dns_suffix ) deleted = [] for rel in rels: if rel.target in children_ids: deleted.append( provider.delete_relationship( device_id=device_id, rel_id=rel.id, central_dns_suffix=central_dns_suffix, ) ) if not deleted: raise ForbiddenError(f"Childs {children_ids} cannot be removed.") return deleted def get_edge_device( cmd, app_id: str, device_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> Any: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) def raise_error(): raise InvalidArgumentValueError( "The specified device Id does not identify as an IoT Edge device." ) # check if device is edge try: twin = provider.get_device_twin( device_id, central_dns_suffix=central_dns_suffix ) capabilities = twin.device_twin.get("capabilities") if not capabilities: raise_error() iot_edge = capabilities.get("iotEdge") if not iot_edge: raise_error() return provider.get_device( device_id=device_id, central_dns_suffix=central_dns_suffix ) except Exception: raise_error() def list_device_modules( cmd, app_id: str, device_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, ) -> List[EdgeModule]: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=API_VERSION ) return provider.list_device_modules( device_id, central_dns_suffix=central_dns_suffix ) def get_device_module( cmd, app_id: str, device_id: str, module_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, ) -> EdgeModule: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=API_VERSION ) modules = provider.list_device_modules( device_id, central_dns_suffix=central_dns_suffix ) for module in modules: if module.module_id == module_id: return module raise ResourceNotFoundError( f"A module named '{module_id}' does not exist on device {device_id} or is not currently available" ) def get_edge_manifest( cmd, app_id: str, device_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT ): template_provider = CentralDeviceTemplateProvider( cmd=cmd, app_id=app_id, token=token, api_version=API_VERSION ) device = get_edge_device( cmd, app_id=app_id, device_id=device_id, token=token, central_dns_suffix=central_dns_suffix, ) template = template_provider.get_device_template( device.template, central_dns_suffix=central_dns_suffix ) return template.deployment_manifest def restart_device_module( cmd, app_id: str, device_id: str, module_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, ) -> List[EdgeModule]: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=API_VERSION ) return provider.restart_device_module( device_id, module_id, central_dns_suffix=central_dns_suffix ) def registration_summary( cmd, app_id: str, token=None, api_version=API_VERSION, central_dns_suffix=CENTRAL_ENDPOINT, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.get_device_registration_summary( central_dns_suffix=central_dns_suffix, ) def get_credentials( cmd, app_id: str, device_id, token=None, api_version=API_VERSION, central_dns_suffix=CENTRAL_ENDPOINT, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.get_device_credentials( device_id=device_id, central_dns_suffix=central_dns_suffix, ) def compute_device_key(cmd, primary_key, device_id): return utility.compute_device_key( primary_key=primary_key, registration_id=device_id ) def get_template_id(device: DeviceGa): return getattr(device, "template") def get_attestation( cmd, app_id: str, device_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.get_device_attestation( device_id=device_id, central_dns_suffix=central_dns_suffix, ) def delete_attestation( cmd, app_id: str, device_id: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.delete_device_attestation(device_id, central_dns_suffix=central_dns_suffix) def update_attestation( cmd, app_id: str, device_id: str, content: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) payload = utility.process_json_arg(content, argument_name="content") return provider.update_device_attestation(device_id, payload=payload, central_dns_suffix=central_dns_suffix) def create_attestation( cmd, app_id: str, device_id: str, content: str, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) payload = utility.process_json_arg(content, argument_name="content") return provider.create_device_attestation(device_id, payload=payload, central_dns_suffix=central_dns_suffix) def list_modules( cmd, app_id: str, device_id, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.list_modules( device_id=device_id, central_dns_suffix=central_dns_suffix, ) def list_components( cmd, app_id: str, device_id, module_name=None, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.list_device_components( device_id=device_id, module_name=module_name, central_dns_suffix=central_dns_suffix, ) def get_properties( cmd, app_id: str, device_id: str, component_name: Optional[str] = None, module_name: Optional[str] = None, token: Optional[str] = None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.get_device_properties( device_id=device_id, component_name=component_name, module_name=module_name, central_dns_suffix=central_dns_suffix, ) def update_properties( cmd, app_id: str, device_id, content: str, component_name: Optional[str] = None, module_name: Optional[str] = None, token: Optional[str] = None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) payload = utility.process_json_arg(content, argument_name="content") return provider.update_device_properties( device_id=device_id, payload=payload, component_name=component_name, module_name=module_name, central_dns_suffix=central_dns_suffix, ) def replace_properties( cmd, app_id: str, device_id, content: str, component_name: Optional[str] = None, module_name: Optional[str] = None, token: Optional[str] = None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) payload = utility.process_json_arg(content, argument_name="content") return provider.replace_device_properties( device_id=device_id, payload=payload, component_name=component_name, module_name=module_name, central_dns_suffix=central_dns_suffix, ) def get_telemetry_value( cmd, app_id: str, device_id, telemetry_name, component_name=None, module_name=None, token=None, central_dns_suffix=CENTRAL_ENDPOINT, api_version=API_VERSION, ) -> dict: provider = CentralDeviceProvider( cmd=cmd, app_id=app_id, token=token, api_version=api_version ) return provider.get_telemetry_value( device_id=device_id, component_name=component_name, module_name=module_name, telemetry_name=telemetry_name, central_dns_suffix=central_dns_suffix, )