azext_iot/monitor/parsers/strings.py (91 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
SUPPORTED_ENCODING = ["utf-8"]
SUPPORTED_FIELD_NAME_CHARS = ["a-z", "A-Z", "0-9", "_"]
SUPPORTED_CONTENT_TYPE = ["application/json"]
SUPPORTED_MESSAGE_HEADERS = []
def unknown_device_id(): # error
return "Device ID not found in message"
def invalid_json(): # error
return "Invalid JSON format."
def invalid_encoding(encoding: str): # warning
return "Encoding type '{}' is not supported. Expected encoding '{}'.".format(
encoding, SUPPORTED_ENCODING
)
def invalid_field_name(invalid_field_names: list): # error
return (
"Invalid characters detected. Invalid field names: '{}'. "
"Allowed characters: {}."
).format(invalid_field_names, SUPPORTED_FIELD_NAME_CHARS)
def invalid_pnp_property_not_value_wrapped():
raise NotImplementedError()
def invalid_non_pnp_field_name_duplicate():
raise NotImplementedError()
def content_type_mismatch(
actual_content_type: str, expected_content_type: str
): # warning
return "Content type '{}' is not supported. Expected Content type is '{}'.".format(
actual_content_type, expected_content_type
)
def invalid_custom_headers():
return (
"Custom message headers are not supported in IoT Central. "
"All the custom message headers will be dropped. "
"Supported message headers: '{}'."
).format(SUPPORTED_MESSAGE_HEADERS)
# warning
def invalid_component_name(component_name: str, allowed_components: list):
return (
"Device is specifying a component that is unknown. Device specified component: '{}'. Allowed components: '{}'."
).format(component_name, allowed_components)
# warning
def invalid_field_name_mismatch_template(
unmodeled_capabilities: list, modeled_capabilities: list
):
return (
"Device is sending data that has not been defined in the device template. "
"Following capabilities have NOT been defined in the device template '{}'. "
"Following capabilities have been defined in the device template (grouped by interface) '{}'. "
).format(unmodeled_capabilities, modeled_capabilities)
# warning
def invalid_field_name_component_mismatch_template(
unmodeled_capabilities: list, modeled_capabilities: list
):
return (
"Device is sending data that has not been defined in the device template. "
"Following capabilities have NOT been defined in the device template '{}'. "
"Following capabilities have been defined in the device template (grouped by components) '{}'. "
).format(unmodeled_capabilities, modeled_capabilities)
# warning
def duplicate_property_name(duplicate_prop_name, interfaces: list):
return (
"Duplicate property: '{}' found under following interfaces {} in the device model. "
"Either provide the interface name as part of the device payload or make the propery name unique in the device model"
).format(duplicate_prop_name, interfaces)
# error
def invalid_primitive_schema_mismatch_template(field_name: str, data_type: str, data):
return (
"Datatype of telemetry field '{}' does not match the datatype {}. Data sent by the device : {}. "
"For more information, see: https://aka.ms/iotcentral-payloads"
).format(
field_name,
data_type,
data,
)
# to remove
def invalid_interface_name_not_found():
return "Interface name not found."
# to remove
def invalid_interface_name_mismatch(
expected_interface_name: str, actual_interface_name: str
):
return "Inteface name mismatch. Expected: {}, Actual: {}.".format(
expected_interface_name, actual_interface_name
)
# warning; downgrade to info if needed
def invalid_system_properties():
return "Failed to parse system properties."
# warning
def invalid_encoding_none_found():
return (
"No encoding found. Expected encoding 'utf-8' to be present in message header."
)
# warning
def invalid_encoding_missing():
return "Content type not found in system properties."
# warning
def invalid_annotations():
return "Unable to decode message.annotations."
# warning
def invalid_application_properties():
return "Unable to decode message.application_properties."
# error
def device_template_not_found(error: Exception):
return "Error retrieving template '{}'. Please try again later.".format(str(error))
# error
def invalid_template_extract_schema_failed(template: dict):
return "Unable to extract device schema from template '{}'.".format(template)