azext_iot/monitor/parsers/common_parser.py (151 lines of code) (raw):

# coding=utf-8 # -------------------------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- import json import re from azext_iot.iothub.common import NON_DECODABLE_PAYLOAD from azext_iot.monitor.utility import unicode_decode from uamqp.message import Message from azext_iot.common.utility import parse_entity, unicode_binary_map from azext_iot.monitor.base_classes import AbstractBaseParser from azext_iot.monitor.parsers import strings from azext_iot.monitor.models.arguments import CommonParserArguments from azext_iot.monitor.models.enum import Severity from azext_iot.monitor.parsers.issue import IssueHandler DEVICE_ID_IDENTIFIER = b"iothub-connection-device-id" MODULE_ID_IDENTIFIER = b"iothub-connection-module-id" INTERFACE_NAME_IDENTIFIER_V1 = b"iothub-interface-name" INTERFACE_NAME_IDENTIFIER_V2 = b"dt-dataschema" COMPONENT_NAME_IDENTIFIER = b"dt-subject" class CommonParser(AbstractBaseParser): def __init__(self, message: Message, common_parser_args: CommonParserArguments): self.issues_handler = IssueHandler() self._common_parser_args = common_parser_args self._message = message self.device_id = "" # need to default self.device_id = self._parse_device_id(message) self.module_id = self._parse_module_id(message) self.interface_name = self._parse_interface_name(message) self.component_name = self._parse_component_name(message) def parse_message(self) -> dict: """ Parses an AMQP based IoT Hub telemetry event. """ message = self._message properties = self._common_parser_args.properties content_type = self._common_parser_args.content_type event = {} event["origin"] = self.device_id event["module"] = self.module_id event["interface"] = self.interface_name event["component"] = self.component_name if not properties: properties = [] # guard against None being passed in system_properties = self._parse_system_properties(message) self._parse_content_encoding(message, system_properties) content_type = self._parse_content_type(content_type, system_properties) if properties: event["properties"] = {} if "anno" in properties or "all" in properties: annotations = self._parse_annotations(message) event["annotations"] = annotations if system_properties and ("sys" in properties or "all" in properties): event["properties"]["system"] = system_properties if "app" in properties or "all" in properties: application_properties = self._parse_application_properties(message) event["properties"]["application"] = application_properties payload = self._parse_payload(message, content_type) event["payload"] = payload event_source = {"event": event} return event_source def _add_issue(self, severity: Severity, details: str): self.issues_handler.add_issue( severity=severity, details=details, message=self._message, device_id=self.device_id, ) def _parse_device_id(self, message: Message) -> str: try: return str(message.annotations.get(DEVICE_ID_IDENTIFIER), "utf8") except Exception: details = strings.unknown_device_id() self._add_issue(severity=Severity.error, details=details) return "" def _parse_module_id(self, message: Message) -> str: try: return str(message.annotations.get(MODULE_ID_IDENTIFIER), "utf8") except Exception: # a message not containing an module name is expected for non-edge devices # so there's no "issue" to log here return "" def _parse_interface_name(self, message: Message) -> str: try: # Grab either the DTDL v1 or v2 amqp interface identifier. # It's highly unlikely both will be present at the same time # as they reflect different versions of a Plug & Play device. target_interface = message.annotations.get( INTERFACE_NAME_IDENTIFIER_V1 ) or message.annotations.get(INTERFACE_NAME_IDENTIFIER_V2) return str(target_interface, "utf8") except Exception: # a message not containing an interface name is expected for non-pnp devices # so there's no "issue" to log here return "" def _parse_component_name(self, message: Message) -> str: try: return str(message.annotations.get(COMPONENT_NAME_IDENTIFIER), "utf8") except Exception: return "" def _parse_system_properties(self, message: Message): try: return unicode_binary_map(parse_entity(message.properties, True)) except Exception: details = strings.invalid_system_properties() self._add_issue(severity=Severity.warning, details=details) return {} def _parse_content_encoding(self, message: Message, system_properties) -> str: content_encoding = "" if "content_encoding" in system_properties: content_encoding = system_properties["content_encoding"] if not content_encoding: details = strings.invalid_encoding_none_found() self._add_issue(severity=Severity.warning, details=details) return None if "utf-8" not in content_encoding.lower(): details = strings.invalid_encoding(content_encoding.lower()) self._add_issue(severity=Severity.warning, details=details) return None return content_encoding def _parse_content_type( self, expected_content_type: str, system_properties: dict ) -> str: actual_content_type = system_properties.get("content_type", "") # Device data is not expected to be of a certain type # Continue parsing per rules that the device is sending if not expected_content_type: return actual_content_type.lower() # Device is expected to send data in a certain format. # Data from device implies the data is in an incorrect format. # Log the issue, and continue parsing as if device is in expected format. if actual_content_type.lower() != expected_content_type.lower(): details = strings.content_type_mismatch( actual_content_type, expected_content_type ) self._add_issue(severity=Severity.warning, details=details) return expected_content_type.lower() return actual_content_type def _parse_annotations(self, message: Message): try: return unicode_binary_map(message.annotations) except Exception: details = strings.invalid_annotations() self._add_issue(severity=Severity.warning, details=details) return {} def _parse_application_properties(self, message: Message): try: return unicode_binary_map(message.application_properties) except Exception: details = strings.invalid_application_properties() self._add_issue(severity=Severity.warning, details=details) return {} def _parse_payload(self, message: Message, content_type): payload = "" data = next(message.get_data()) if data: payload = unicode_decode(data=data, default=NON_DECODABLE_PAYLOAD) if "application/json" in content_type.lower(): return self._try_parse_json(payload) return payload def _try_parse_json(self, payload): result = payload try: regex = r"(\\r\\n)+|\\r+|\\n+" payload_no_white_space = re.compile(regex).sub("", payload) result = json.loads(payload_no_white_space) except Exception: details = strings.invalid_json() self._add_issue(severity=Severity.error, details=details) return result