azext_iot/monitor/property.py (183 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- from azext_iot.central.models.enum import ApiVersion import datetime import isodate import time from azext_iot.monitor.parsers import strings from azext_iot.monitor.models.enum import Severity from azext_iot.constants import ( CENTRAL_ENDPOINT, DEVICETWIN_POLLING_INTERVAL_SEC, DEVICETWIN_MONITOR_TIME_SEC, PNP_DTDLV2_COMPONENT_MARKER, ) from azext_iot.central.models.devicetwin import Property from azext_iot.central.providers import ( CentralDeviceProvider, CentralDeviceTemplateProvider, ) from azext_iot.monitor.parsers.issue import IssueHandler class PropertyMonitor: def __init__( self, cmd, app_id: str, device_id: str, token: str, central_dns_suffix=CENTRAL_ENDPOINT, ): self._cmd = cmd self._app_id = app_id self._device_id = device_id self._token = token self._central_dns_suffix = central_dns_suffix self._central_device_provider = CentralDeviceProvider( cmd=self._cmd, app_id=self._app_id, token=self._token, api_version=ApiVersion.ga.value, ) self._central_template_provider = CentralDeviceTemplateProvider( cmd=self._cmd, app_id=self._app_id, token=self._token, api_version=ApiVersion.ga.value, ) self._template = self._get_device_template() def _compare_properties(self, prev_prop: Property, prop: Property): if prev_prop.version == prop.version: return changes = { key: self._changed_props( prop.props[key], prop.metadata[key], key, ) for key, val in prop.metadata.items() if self._is_relevant(key, val) } return changes def _is_relevant(self, key, val): if key in {"$lastUpdated", "$lastUpdatedVersion"}: return False updated_within = datetime.datetime.now() - datetime.timedelta( seconds=DEVICETWIN_MONITOR_TIME_SEC ) last_updated = isodate.parse_datetime(val["$lastUpdated"]) return last_updated.timestamp() >= updated_within.timestamp() def _changed_props(self, prop, metadata, property_name): # not an interface - whole thing is change log if not self._is_component(prop): return prop # iterate over properties in the component # if the property is not an exact match for what is present in the previous set of properties # track it as a change diff = { key: prop[key] for key, val in metadata.items() if self._is_relevant(key, val) } return diff def _is_component(self, prop): return isinstance(prop, dict) and prop.get(PNP_DTDLV2_COMPONENT_MARKER) == "c" def _validate_payload(self, changes, minimum_severity): for value in changes: issues = self._validate_payload_against_entities( changes[value], value, minimum_severity ) for issue in issues: issue.log() def _validate_payload_against_entities(self, payload: dict, name, minimum_severity): name_miss = [] issues_handler = IssueHandler() if not self._is_component(payload): # update is not part of a component check under interfaces schema = self._template.get_schema(name=name) if not schema: name_miss.append(name) details = strings.invalid_field_name_mismatch_template( name_miss, self._template.schema_names ) interfaces_with_specified_property = ( self._template._get_interface_list_property(name) ) if len(interfaces_with_specified_property) > 1: details = strings.duplicate_property_name( name, interfaces_with_specified_property ) issues_handler.add_central_issue( severity=Severity.warning, details=details, message=None, device_id=self._device_id, template_id=self._template.id, ) else: # Property update is part of a component perform additional validations under component list. component_property_updates = [ property_name for property_name in payload if property_name != PNP_DTDLV2_COMPONENT_MARKER ] for property_name in component_property_updates: schema = self._template.get_schema( name=property_name, identifier=name, is_component=True ) if not schema: name_miss.append(property_name) details = strings.invalid_field_name_component_mismatch_template( name_miss, self._template.component_schema_names ) if name_miss: issues_handler.add_central_issue( severity=Severity.warning, details=details, message=None, device_id=self._device_id, template_id=self._template.id, ) return issues_handler.get_issues_with_minimum_severity(minimum_severity) def _get_device_template(self): device = self._central_device_provider.get_device( self._device_id, central_dns_suffix=self._central_dns_suffix ) template = self._central_template_provider.get_device_template( device_template_id=device.template, central_dns_suffix=self._central_dns_suffix, ) return template def start_property_monitor( self, ): prev_twin = None while True: twin = self._central_device_provider.get_device_twin( device_id=self._device_id, central_dns_suffix=self._central_dns_suffix ) if prev_twin: change_d = self._compare_properties( prev_twin.desired_property, twin.desired_property, ) change_r = self._compare_properties( prev_twin.reported_property, twin.reported_property ) if change_d: print("Changes in desired properties:") print("version :", twin.desired_property.version) print(change_d) if change_r: print("Changes in reported properties:") print("version :", twin.reported_property.version) print(change_r) time.sleep(DEVICETWIN_POLLING_INTERVAL_SEC) prev_twin = twin def start_validate_property_monitor(self, minimum_severity): prev_twin = None while True: twin = self._central_device_provider.get_device_twin( device_id=self._device_id, central_dns_suffix=self._central_dns_suffix ) if prev_twin: change_r = self._compare_properties( prev_twin.reported_property, twin.reported_property ) if change_r: self._validate_payload(change_r, minimum_severity) time.sleep(DEVICETWIN_POLLING_INTERVAL_SEC) prev_twin = twin