in source/machine_connector/m2c2_opcda_connector/validations/message_validation.py [0:0]
def validate_schema(self, message: dict) -> None:
try:
if not isinstance(message, dict):
self.error_to_iot_and_raise(msg.ERR_MSG_SCHEMA_MESSAGE_NOT_DICT.format(message))
self.missing_keys = self.find_missing_keys(message, self.vals.payload_required_keys())
if self.missing_keys:
self.error_to_iot_and_raise(msg.ERR_MISSING_KEYS.format(self.missing_keys))
if not message["messages"]:
self.error_to_iot_and_raise(msg.ERR_MSG_SCHEMA_EMPTY_MESSAGES.format(message))
if not self.valid_val(message, self.vals.payload_validations()):
self.error_to_iot_and_raise(msg.ERR_MSG_SCHEMA_MISSING_KEY.format(message))
for entry in message["messages"]:
self.entry_missing_keys = self.find_missing_keys(
entry,
self.vals.messages_required_keys()
)
if self.entry_missing_keys:
self.error_to_iot_and_raise(msg.ERR_MISSING_KEYS.format(self.entry_missing_keys))
if not self.valid_val(entry, self.vals.msgs_validations()):
self.error_to_iot_and_raise(msg.ERR_MSG_SCHEMA_MISSING_KEY.format(entry))
if not self.parsable(entry):
self.error_to_iot_and_raise(msg.ERR_MSG_SCHEMA_DATE_CORRUPTED.format(entry))
if entry["name"] != message["alias"]:
self.error_to_iot_and_raise(msg.ERR_NAME_NOT_ALIAS.format(message))
except Exception as err:
self.logger.error("Message validation failed. Error: %s", str(err))
raise Exception(msg.ERR_MSG_VALIDATION.format(err))