azext_iot/central/providers/device_provider.py (697 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import List
from azure.cli.core.azclierror import (
AzureResponseError,
ClientRequestError,
CLIInternalError,
RequiredArgumentMissingError,
ResourceNotFoundError,
)
from knack.log import get_logger
from azext_iot.central.models.devicetwin import DeviceTwin
from azext_iot.central.models.edge import EdgeModule
from azext_iot.constants import CENTRAL_ENDPOINT
from azext_iot.central import services as central_services
from azext_iot.central.models.enum import DeviceStatus, ApiVersion
from azext_iot.central.models.ga_2022_07_31 import (DeviceGa, RelationshipGa)
from azext_iot.dps.services import global_service as dps_global_service
logger = get_logger(__name__)
MODEL = "Device"
class CentralDeviceProvider:
def __init__(self, cmd, app_id: str, api_version: str, token=None):
"""
Provider for device APIs
Args:
cmd: command passed into az
app_id: name of app (used for forming request URL)
api_version: API version (appendend to request URL)
token: (OPTIONAL) authorization token to fetch device details from IoTC.
MUST INCLUDE type (e.g. 'SharedAccessToken ...', 'Bearer ...')
Useful in scenarios where user doesn't own the app
therefore AAD token won't work, but a SAS token generated by owner will
"""
self._cmd = cmd
self._app_id = app_id
self._api_version = api_version
self._token = token
self._devices = {}
self._device_templates = {}
self._device_credentials = {}
self._device_registration_info = {}
def get_device(
self,
device_id,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> DeviceGa:
# get or add to cache
device = self._devices.get(device_id)
if not device:
device = central_services.device.get_device(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
token=self._token,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
self._devices[device_id] = device
if not device:
raise ResourceNotFoundError(
"No device found with id: '{}'.".format(device_id)
)
return self._devices[device_id]
def list_devices(
self,
filter=None,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> List[DeviceGa]:
devices = central_services.device.list_devices(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
filter=filter,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
# add to cache
self._devices.update({device.id: device for device in devices})
return devices
def create_device(
self,
device_id,
device_name=None,
template=None,
simulated=False,
organizations=None,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> DeviceGa:
if not device_id:
raise RequiredArgumentMissingError("Device id must be specified.")
if device_id in self._devices:
raise ClientRequestError("Device already exists.")
device = central_services.device.create_device(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
device_name=device_name,
template=template,
simulated=simulated,
organizations=organizations,
token=self._token,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
if not device:
raise AzureResponseError(
"Failed to create device with id: '{}'.".format(device_id)
)
# add to cache
self._devices[device.id] = device
return self._devices[device.id]
def update_device(
self,
device_id,
device_name=None,
template=None,
simulated=None,
enabled=None,
organizations=None,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> DeviceGa:
if not device_id:
raise RequiredArgumentMissingError("Device id must be specified.")
if device_id in self._devices:
raise ClientRequestError("Device already exists.")
device = central_services.device.update_device(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
device_name=device_name,
template=template,
simulated=simulated,
enabled=enabled,
organizations=organizations,
token=self._token,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
if not device:
raise ResourceNotFoundError(
"No device found with id: '{}'.".format(device_id)
)
# add to cache
self._devices[device.id] = device
return self._devices[device.id]
def delete_device(
self,
device_id,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
if not device_id:
raise RequiredArgumentMissingError("Device id must be specified.")
# get or add to cache
result = central_services.device.delete_device(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
token=self._token,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
# remove from cache
# pop "miss" raises a KeyError if None is not provided
self._devices.pop(device_id, None)
self._device_credentials.pop(device_id, None)
return result
def list_relationships(
self,
device_id,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> List[RelationshipGa]:
relationships = central_services.device.list_relationships(
self._cmd,
app_id=self._app_id,
device_id=device_id,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
if relationships is None:
return []
return relationships
def add_relationship(
self,
device_id,
target_id,
rel_id,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
relationship = central_services.device.create_relationship(
self._cmd,
app_id=self._app_id,
device_id=device_id,
rel_id=rel_id,
target_id=target_id,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
if not relationship:
raise ResourceNotFoundError(
"No relationship found with id: '{}'.".format(rel_id)
)
return relationship
def update_relationship(
self,
device_id,
target_id,
rel_id,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
relationship = central_services.device.update_relationship(
self._cmd,
app_id=self._app_id,
device_id=device_id,
rel_id=rel_id,
target_id=target_id,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
if not relationship:
raise ResourceNotFoundError(
"No relationship found with id: '{}'.".format(rel_id)
)
return relationship
def delete_relationship(
self,
device_id,
rel_id,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
# get or add to cache
result = central_services.device.delete_relationship(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
rel_id=rel_id,
token=self._token,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
return result
def get_device_credentials(
self,
device_id,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
credentials = self._device_credentials.get(device_id)
if not credentials:
credentials = central_services.device.get_device_credentials(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
token=self._token,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
if not credentials:
raise CLIInternalError(
"Could not find device credentials for device '{}'.".format(device_id)
)
# add to cache
self._device_credentials[device_id] = credentials
return credentials
def get_device_registration_info(
self,
device_id,
device_status: DeviceStatus,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> dict:
dps_state = {}
info = self._device_registration_info.get(device_id)
if info:
return info
device = self.get_device(device_id, central_dns_suffix)
if device._device_status == DeviceStatus.provisioned:
credentials = self.get_device_credentials(
device_id=device_id,
central_dns_suffix=central_dns_suffix,
)
id_scope = credentials["idScope"]
key = credentials["symmetricKey"]["primaryKey"]
dps_state = dps_global_service.get_registration_state(
id_scope=id_scope, key=key, device_id=device_id
)
dps_state = self._dps_populate_essential_info(dps_state, device._device_status)
info = {
"@device_id": device_id,
"dps_state": dps_state,
"device_registration_info": device.get_registration_info(),
}
self._device_registration_info[device_id] = info
return info
def get_device_registration_summary(self, central_dns_suffix=CENTRAL_ENDPOINT):
return central_services.device.get_device_registration_summary(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def run_command(
self,
device_id: str,
interface_id: str,
component_name: str,
module_name: str,
command_name: str,
payload: dict,
central_dns_suffix=CENTRAL_ENDPOINT,
):
if interface_id and self._is_interface_id_component(
device_id=device_id,
interface_id=interface_id,
central_dns_suffix=central_dns_suffix,
):
return central_services.device.run_command(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
device_id=device_id,
component_name=interface_id,
module_name=module_name,
command_name=command_name,
payload=payload,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
return central_services.device.run_command(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
device_id=device_id,
component_name=component_name,
module_name=module_name,
command_name=command_name,
payload=payload,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
def get_command_history(
self,
device_id: str,
interface_id: str,
component_name: str,
module_name: str,
command_name: str,
central_dns_suffix=CENTRAL_ENDPOINT,
):
if interface_id and self._is_interface_id_component(
device_id=device_id,
interface_id=interface_id,
central_dns_suffix=central_dns_suffix,
):
return central_services.device.get_command_history(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
device_id=device_id,
component_name=interface_id,
module_name=module_name,
command_name=command_name,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
return central_services.device.get_command_history(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
device_id=device_id,
component_name=component_name,
module_name=module_name,
command_name=command_name,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
def get_module_command_history(
self,
device_id: str,
module_name: str,
component_name: str,
command_name: str,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.get_module_command_history(
cmd=self._cmd,
app_id=self._app_id,
token=self._token,
device_id=device_id,
module_name=module_name,
component_name=component_name,
command_name=command_name,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
def list_device_modules(
self,
device_id,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> List[EdgeModule]:
modules = central_services.device.list_device_modules(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
if not modules:
return []
return modules
def restart_device_module(
self,
device_id,
module_id,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> List[EdgeModule]:
status = central_services.device.restart_device_module(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
module_id=module_id,
token=self._token,
central_dns_suffix=central_dns_suffix,
)
if not status or status != 200:
raise ResourceNotFoundError(
"No module found for device {} with id: '{}'.".format(
device_id, module_id
)
)
return status
def get_device_twin(
self,
device_id,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> DeviceTwin:
twin = central_services.device.get_device_twin(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
token=self._token,
central_dns_suffix=central_dns_suffix,
)
if not twin:
raise ResourceNotFoundError(
"No twin found for device with id: '{}'.".format(device_id)
)
return twin
def run_manual_failover(
self,
device_id: str,
ttl_minutes: int = None,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.run_manual_failover(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
ttl_minutes=ttl_minutes,
token=self._token,
central_dns_suffix=central_dns_suffix,
)
def run_manual_failback(
self,
device_id: str,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.run_manual_failback(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
token=self._token,
central_dns_suffix=central_dns_suffix,
)
def purge_c2d_messages(
self,
device_id: str,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.purge_c2d_messages(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
token=self._token,
central_dns_suffix=central_dns_suffix,
)
def get_device_attestation(
self,
device_id: str,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.get_device_attestation(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def delete_device_attestation(
self,
device_id: str,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.delete_device_attestation(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def update_device_attestation(
self,
device_id: str,
payload,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.update_device_attestation(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
payload=payload,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def create_device_attestation(
self,
device_id: str,
payload,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.create_device_attestation(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
payload=payload,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def list_modules(
self,
device_id: str,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.list_modules(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def list_device_components(
self,
device_id: str,
module_name: str = None,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.list_device_components(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
module_name=module_name,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def get_device_properties(
self,
device_id: str,
component_name: str = None,
module_name: str = None,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.get_device_properties_or_telemetry_value(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
module_name=module_name,
component_name=component_name,
telemetry_name=None,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def replace_device_properties(
self,
device_id: str,
payload: str,
component_name: str = None,
module_name: str = None,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.replace_properties(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
module_name=module_name,
component_name=component_name,
payload=payload,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def update_device_properties(
self,
device_id: str,
payload: str,
component_name: str = None,
module_name: str = None,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.update_properties(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
module_name=module_name,
component_name=component_name,
payload=payload,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def get_telemetry_value(
self,
device_id: str,
component_name: str,
module_name: str,
telemetry_name: str,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.get_device_properties_or_telemetry_value(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
module_name=module_name,
component_name=component_name,
telemetry_name=telemetry_name,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def replace_device_component_properties(
self,
device_id: str,
component_name: str,
payload: str,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.replace_properties(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
module_name=None,
component_name=component_name,
payload=payload,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def update_device_component_properties(
self,
device_id: str,
component_name: str,
payload: str,
central_dns_suffix=CENTRAL_ENDPOINT,
):
return central_services.device.update_properties(
cmd=self._cmd,
app_id=self._app_id,
device_id=device_id,
module_name=None,
component_name=component_name,
payload=payload,
token=self._token,
api_version=self._api_version,
central_dns_suffix=central_dns_suffix,
)
def _dps_populate_essential_info(self, dps_info, device_status: DeviceStatus):
error = {
DeviceStatus.provisioned: "None.",
DeviceStatus.registered: "Device is not yet provisioned.",
DeviceStatus.blocked: "Device is blocked from connecting to IoT Central application."
" Unblock the device in IoT Central and retry. Learn more: https://aka.ms/iotcentral-docs-dps-SAS",
DeviceStatus.unassociated: "Device does not have a valid template associated with it.",
}
filtered_dps_info = {
"status": dps_info.get("status"),
"error": error.get(device_status),
}
return filtered_dps_info
def _is_interface_id_component(
self,
device_id: str,
interface_id: str,
central_dns_suffix=CENTRAL_ENDPOINT,
) -> bool:
current_device = self.get_device(device_id, central_dns_suffix)
template = central_services.device_template.get_device_template(
cmd=self._cmd,
app_id=self._app_id,
device_template_id=current_device.instance_of
if self._api_version == ApiVersion.preview.value
else current_device.template,
token=self._token,
central_dns_suffix=central_dns_suffix,
api_version=self._api_version,
)
if interface_id in template.components:
return True
for module in template.modules:
if interface_id in module.components:
return True
return False