azext_iot/monitor/parsers/central_parser.py (123 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- import re from uamqp.message import Message from azext_iot.central.providers import CentralDeviceProvider from azext_iot.central.providers import CentralDeviceTemplateProvider from azext_iot.monitor.parsers import strings from azext_iot.monitor.central_validator import validate, extract_schema_type from azext_iot.monitor.models.arguments import CommonParserArguments from azext_iot.monitor.models.enum import Severity from azext_iot.monitor.parsers.common_parser import CommonParser from azext_iot.constants import CENTRAL_ENDPOINT from azext_iot.central.models.v2022_06_30_preview import TemplatePreview class CentralParser(CommonParser): def __init__( self, message: Message, common_parser_args: CommonParserArguments, central_device_provider: CentralDeviceProvider, central_template_provider: CentralDeviceTemplateProvider, central_dns_suffix=CENTRAL_ENDPOINT, ): super(CentralParser, self).__init__( message=message, common_parser_args=common_parser_args ) self._central_device_provider = central_device_provider self._central_template_provider = central_template_provider self._central_dns_suffix = central_dns_suffix self._template_id = None def _add_central_issue(self, severity: Severity, details: str): self.issues_handler.add_central_issue( severity=severity, details=details, message=self._message, device_id=self.device_id, template_id=self._template_id, ) def parse_message(self) -> dict: parsed_message = super(CentralParser, self).parse_message() payload = parsed_message["event"]["payload"] self._perform_static_validations(payload=payload) # disable dynamic validations until Microservices work is figured out self._perform_dynamic_validations(payload=payload) return parsed_message # Static validations should only need information present in the payload # i.e. there should be no need for network calls def _perform_static_validations(self, payload: dict): # if its not a dictionary, something else went wrong with parsing if not isinstance(payload, dict): return self._validate_field_names(payload=payload) def _validate_field_names(self, payload: dict): # source: # https://github.com/Azure/IoTPlugandPlay/tree/master/DTDL regex = "^[a-zA-Z_][a-zA-Z0-9_]*$" # if a field name does not match the above regex, it is an invalid field name invalid_field_names = [ field_name for field_name in payload.keys() if not re.search(regex, field_name) ] if invalid_field_names: details = strings.invalid_field_name(invalid_field_names) self._add_issue(severity=Severity.error, details=details) # Dynamic validations should need data external to the payload # e.g. device template def _perform_dynamic_validations(self, payload: dict): # if the payload is not a dictionary some other parsing error occurred if not isinstance(payload, dict): return template = self._get_template() if not isinstance(template, TemplatePreview): return # if component name is not defined then data should be mapped to root/inherited interfaces if not self.component_name: self._validate_payload( payload=payload, template=template, is_component=False ) return if not template.components: # template does not have any valid components details = strings.invalid_component_name(self.component_name, []) self._add_central_issue(severity=Severity.warning, details=details) return # if component name is defined check to see if its a valid name if self.component_name not in template.components: details = strings.invalid_component_name( self.component_name, list(template.components.keys()) ) self._add_central_issue(severity=Severity.warning, details=details) return # if component name is valid check to see if payload is valid self._validate_payload(payload=payload, template=template, is_component=True) def _get_template(self): try: device = self._central_device_provider.get_device( self.device_id, central_dns_suffix=self._central_dns_suffix ) template = self._central_template_provider.get_device_template( device.template, central_dns_suffix=self._central_dns_suffix ) self._template_id = template.id return template except Exception as e: details = strings.device_template_not_found(e) self._add_central_issue(severity=Severity.error, details=details) # currently validates: # 1) primitive types match (e.g. boolean is indeed bool etc) # 2) names match (i.e. Humidity vs humidity etc) def _validate_payload( self, payload: dict, template: TemplatePreview, is_component: bool ): name_miss = [] for telemetry_name, telemetry in payload.items(): schema = template.get_schema( name=telemetry_name, identifier=self.component_name, is_component=is_component, ) if not schema: name_miss.append(telemetry_name) else: self._process_telemetry(telemetry_name, schema, telemetry) if name_miss: if is_component: details = strings.invalid_field_name_component_mismatch_template( name_miss, template.component_schema_names ) else: details = strings.invalid_field_name_mismatch_template( name_miss, template.schema_names, ) self._add_central_issue(severity=Severity.warning, details=details) def _process_telemetry(self, telemetry_name: str, schema, telemetry): expected_type = extract_schema_type(schema) is_payload_valid = validate(schema, telemetry) if expected_type and not is_payload_valid: details = strings.invalid_primitive_schema_mismatch_template( telemetry_name, expected_type, telemetry ) self._add_central_issue(severity=Severity.error, details=details)