mns/mns_xml_handler.py (515 lines of code) (raw):

#coding=utf-8 # Copyright (C) 2015, Alibaba Cloud Computing #Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: #The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. #THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import xml.dom.minidom import sys import base64 import string import types from xml.etree import ElementTree from .mns_exception import * from .mns_request import * try: import json except ImportError: import simplejson as json XMLNS = "http://mns.aliyuncs.com/doc/v1/" class EncoderBase: @staticmethod def insert_if_valid(item_name, item_value, invalid_value, data_dic): if item_value != invalid_value: data_dic[item_name] = item_value @staticmethod def list_to_xml(tag_name1, tag_name2, data_list): doc = xml.dom.minidom.Document() rootNode = doc.createElement(tag_name1) rootNode.attributes["xmlns"] = XMLNS doc.appendChild(rootNode) if data_list: for item in data_list: keyNode = doc.createElement(tag_name2) rootNode.appendChild(keyNode) keyNode.appendChild(doc.createTextNode(item)) else: nullNode = doc.createTextNode("") rootNode.appendChild(nullNode) return doc.toxml("utf-8") @staticmethod def dic_to_xml(tag_name, data_dic): doc = xml.dom.minidom.Document() rootNode = doc.createElement(tag_name) rootNode.attributes["xmlns"] = XMLNS doc.appendChild(rootNode) if data_dic: for k,v in data_dic.items(): keyNode = doc.createElement(k) if type(v) is dict: for subkey,subv in v.items(): subNode = doc.createElement(subkey) subNode.appendChild(doc.createTextNode(subv)) keyNode.appendChild(subNode) else: #tmp = doc.createTextNode(v.decode('utf-8')) tmp = doc.createTextNode(v) keyNode.appendChild(tmp) #keyNode.appendChild(doc.createTextNode(v)) rootNode.appendChild(keyNode) else: nullNode = doc.createTextNode("") rootNode.appendChild(nullNode) return doc.toxml("utf-8") @staticmethod def listofdic_to_xml(root_tagname, sec_tagname, dataList): doc = xml.dom.minidom.Document() rootNode = doc.createElement(root_tagname) rootNode.attributes["xmlns"] = XMLNS doc.appendChild(rootNode) if dataList: for subData in dataList: secNode = doc.createElement(sec_tagname) rootNode.appendChild(secNode) if not subData: nullNode = doc.createTextNode("") secNode.appendChild(nullNode) continue for k,v in subData.items(): keyNode = doc.createElement(k) secNode.appendChild(keyNode) keyNode.appendChild(doc.createTextNode(v)) else: nullNode = doc.createTextNode("") rootNode.appendChild(nullNode) return doc.toxml("utf-8") class SetAccountAttrEncoder(EncoderBase): @staticmethod def encode(data): account_attr = {} EncoderBase.insert_if_valid("LoggingBucket", data.logging_bucket, None, account_attr) return EncoderBase.dic_to_xml("Account", account_attr) class QueueEncoder(EncoderBase): @staticmethod def encode(data, has_slice = True): queue = {} EncoderBase.insert_if_valid("VisibilityTimeout", str(data.visibility_timeout), "-1", queue) EncoderBase.insert_if_valid("MaximumMessageSize", str(data.maximum_message_size), "-1", queue) EncoderBase.insert_if_valid("MessageRetentionPeriod", str(data.message_retention_period), "-1", queue) EncoderBase.insert_if_valid("DelaySeconds", str(data.delay_seconds), "-1", queue) EncoderBase.insert_if_valid("PollingWaitSeconds", str(data.polling_wait_seconds), "-1", queue) logging_enabled = str(data.logging_enabled) if str(data.logging_enabled).lower() == "true": logging_enabled = "True" elif str(data.logging_enabled).lower() == "false": logging_enabled = "False" EncoderBase.insert_if_valid("LoggingEnabled", logging_enabled, "None", queue) return EncoderBase.dic_to_xml("Queue", queue) class MessageEncoder(EncoderBase): @staticmethod def encode(data): message = {} if data.base64encode: #base64 only support str tmpbody = data.message_body.encode('utf-8') msgbody = base64.b64encode(tmpbody).decode('utf-8') else: #xml only support unicode when contains Chinese if sys.version_info.major >= 3: msgbody = data.message_body else: msgbody = data.message_body.decode('utf-8') if isinstance(data.message_body, str) else data.message_body EncoderBase.insert_if_valid("MessageBody", msgbody, u"", message) EncoderBase.insert_if_valid("DelaySeconds", str(data.delay_seconds), u"-1", message) EncoderBase.insert_if_valid("Priority", str(data.priority), u"-1", message) return EncoderBase.dic_to_xml("Message", message) class MessagesEncoder: @staticmethod def encode(message_list, base64encode): msglist = [] for msg in message_list: item = {} if base64encode: #base64 only support str #tmpbody = msg.message_body.encode('utf-8') if isinstance(msg.message_body, unicode) else msg.message_body tmpbody = msg.message_body.encode('utf-8') msgbody = base64.b64encode(tmpbody).decode('utf-8') else: #xml only support unicode when contains Chinese if sys.version_info.major >= 3: msgbody = msg.message_body else: msgbody = msg.message_body.decode('utf-8') if isinstance(msg.message_body, str) else msg.message_body EncoderBase.insert_if_valid("MessageBody", msgbody, u"", item) EncoderBase.insert_if_valid("DelaySeconds", str(msg.delay_seconds), u"-1", item) EncoderBase.insert_if_valid("Priority", str(msg.priority), u"-1", item) msglist.append(item) return EncoderBase.listofdic_to_xml(u"Messages", u"Message", msglist) class TopicMessageEncoder: @staticmethod def encode(req): message = {} #xml only support unicode when contains Chinese msgbody = req.message_body EncoderBase.insert_if_valid("MessageBody", msgbody, "", message) EncoderBase.insert_if_valid("MessageTag", req.message_tag, "", message) msg_attr = {} if req.direct_mail is not None: msg_attr["DirectMail"] = json.dumps(req.direct_mail.get()) if req.direct_sms is not None: msg_attr["DirectSMS"] = json.dumps(req.direct_sms.get()) if msg_attr != {}: message["MessageAttributes"] = msg_attr return EncoderBase.dic_to_xml("Message", message) class ReceiptHandlesEncoder: @staticmethod def encode(receipt_handle_list): return EncoderBase.list_to_xml("ReceiptHandles", "ReceiptHandle", receipt_handle_list) class TopicEncoder(EncoderBase): @staticmethod def encode(data): topic = {} logging_enabled = str(data.logging_enabled) if str(data.logging_enabled).lower() == "true": logging_enabled = "True" elif str(data.logging_enabled).lower() == "false": logging_enabled = "False" EncoderBase.insert_if_valid("MaximumMessageSize", str(data.maximum_message_size), "-1", topic) EncoderBase.insert_if_valid("LoggingEnabled", logging_enabled, "None", topic) return EncoderBase.dic_to_xml("Topic", topic) class SubscriptionEncoder(EncoderBase): @staticmethod def encode(data, set=False): subscription = {} EncoderBase.insert_if_valid("NotifyStrategy", data.notify_strategy, "", subscription) if not set: EncoderBase.insert_if_valid("Endpoint", data.endpoint, "", subscription) EncoderBase.insert_if_valid("FilterTag", data.filter_tag, "", subscription) EncoderBase.insert_if_valid("NotifyContentFormat", data.notify_content_format, "", subscription) return EncoderBase.dic_to_xml("Subscription", subscription) #-------------------------------------------------decode-----------------------------------------------------# class DecoderBase: @staticmethod def xml_to_nodes(tag_name, xml_data): if xml_data == "": raise MNSClientNetworkException("RespDataDamaged", "Xml data is \"\"!") try: if (sys.version_info.major < 3) and (not isinstance(xml_data, str)): xml_data = xml_data.encode('utf-8') dom = xml.dom.minidom.parseString(xml_data) except Exception: raise MNSClientNetworkException("RespDataDamaged", xml_data) nodelist = dom.getElementsByTagName(tag_name) if not nodelist: raise MNSClientNetworkException("RespDataDamaged", "No element with tag name '%s'.\nData:%s" % (tag_name, xml_data)) return nodelist[0].childNodes @staticmethod def xml_to_dic(tag_name, xml_data, data_dic, req_id=None): try: for node in DecoderBase.xml_to_nodes(tag_name, xml_data): if node.nodeName != "#text": if node.childNodes != []: data_dic[node.nodeName] = node.firstChild.data else: data_dic[node.nodeName] = "" except MNSClientNetworkException as e: raise MNSClientNetworkException(e.type, e.message, req_id) @staticmethod def xml_to_listofdic(root_tagname, sec_tagname, xml_data, data_listofdic, req_id=None): try: for message in DecoderBase.xml_to_nodes(root_tagname, xml_data): if message.nodeName != sec_tagname: continue data_dic = {} for property in message.childNodes: if property.nodeName != "#text" and property.childNodes != []: data_dic[property.nodeName] = property.firstChild.data data_listofdic.append(data_dic) except MNSClientNetworkException as e: raise MNSClientNetworkException(e.type, e.message, req_id) class ListQueueDecoder(DecoderBase): @staticmethod def decode(xml_data, with_meta, req_id=None): queueurl_list = [] queuemeta_list = [] next_marker = u"" if (xml_data != ""): try: root = ElementTree.fromstring(xml_data) namespace = root.tag[0:-6] queues = list(root.iter(namespace + "Queue")) for queue in queues: queuemeta = {} for node in queue: nodename = node.tag[len(namespace):] nodevalue = node.text.strip() if nodename == "QueueURL" and len(nodevalue) > 0 : queueurl_list.append(nodevalue) if len(nodevalue) > 0: queuemeta[nodename] = nodevalue if with_meta: queuemeta_list.append(queuemeta) marker = list(root.iter(namespace + "NextMarker")) for node in marker: next_marker = node.text.strip() except Exception as err: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) else: raise MNSClientNetworkException("RespDataDamaged", "Xml data is \"\"!", req_id) return queueurl_list, str(next_marker), queuemeta_list class GetAccountAttrDecoder(DecoderBase): @staticmethod def decode(xml_data, req_id=None): data_dic = {} DecoderBase.xml_to_dic("Account", xml_data, data_dic) key_list = ["LoggingBucket"] for key in key_list: if key not in data_dic: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) return data_dic class GetQueueAttrDecoder(DecoderBase): @staticmethod def decode(xml_data, req_id=None): data_dic = {} DecoderBase.xml_to_dic("Queue", xml_data, data_dic, req_id) key_list = ["ActiveMessages", "CreateTime", "DelayMessages", "DelaySeconds", "InactiveMessages", "LastModifyTime", "MaximumMessageSize", "MessageRetentionPeriod", "QueueName", "VisibilityTimeout", "PollingWaitSeconds", "LoggingEnabled"] for key in key_list: if key not in data_dic.keys(): raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) return data_dic class SendMessageDecoder(DecoderBase): @staticmethod def decode(xml_data, req_id=None): data_dic = {} DecoderBase.xml_to_dic("Message", xml_data, data_dic, req_id) key_list = ["MessageId", "MessageBodyMD5"] for key in key_list: if key not in data_dic.keys(): raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) receipt_handle = "" if "ReceiptHandle" in data_dic.keys(): receipt_handle = data_dic["ReceiptHandle"] return data_dic["MessageId"], data_dic["MessageBodyMD5"], receipt_handle class BatchSendMessageDecoder(DecoderBase): @staticmethod def decode(xml_data, req_id=None): data_listofdic = [] message_list = [] DecoderBase.xml_to_listofdic("Messages", "Message", xml_data, data_listofdic, req_id) try: for data_dic in data_listofdic: entry = SendMessageResponseEntry() entry.message_id = data_dic["MessageId"] entry.message_body_md5 = data_dic["MessageBodyMD5"] message_list.append(entry) except Exception as err: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) return message_list @staticmethod def decodeError(xml_data, req_id=None): try: return ErrorDecoder.decodeError(xml_data, req_id) except Exception: pass data_listofdic = [] DecoderBase.xml_to_listofdic("Messages", "Message", xml_data, data_listofdic, req_id) if len(data_listofdic) == 0: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) errType = None errMsg = None key_list1 = sorted(["ErrorCode", "ErrorMessage"]) key_list2 = sorted(["MessageId", "MessageBodyMD5"]) for data_dic in data_listofdic: keys = sorted(data_dic.keys()) if keys != key_list1 and keys != key_list2: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) if keys == key_list1 and errType is None: errType = data_dic["ErrorCode"] errMsg = data_dic["ErrorMessage"] return errType, errMsg, None, None, data_listofdic class RecvMessageDecoder(DecoderBase): @staticmethod def decode(xml_data, base64decode, req_id=None): data_dic = {} DecoderBase.xml_to_dic("Message", xml_data, data_dic, req_id) key_list = ["DequeueCount", "EnqueueTime", "FirstDequeueTime", "MessageBody", "MessageId", "MessageBodyMD5", "NextVisibleTime", "ReceiptHandle", "Priority"] for key in key_list: if key not in data_dic.keys(): raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) if base64decode: decode_str = base64.b64decode(data_dic["MessageBody"]) data_dic["MessageBody"] = decode_str return data_dic class BatchRecvMessageDecoder(DecoderBase): @staticmethod def decode(xml_data, base64decode, req_id=None): data_listofdic = [] message_list = [] DecoderBase.xml_to_listofdic("Messages", "Message", xml_data, data_listofdic, req_id) try: for data_dic in data_listofdic: msg = ReceiveMessageResponseEntry() if base64decode: msg.message_body = base64.b64decode(data_dic["MessageBody"]) else: msg.message_body = data_dic["MessageBody"] msg.dequeue_count = int(data_dic["DequeueCount"]) msg.enqueue_time = int(data_dic["EnqueueTime"]) msg.first_dequeue_time = int(data_dic["FirstDequeueTime"]) msg.message_id = data_dic["MessageId"] msg.message_body_md5 = data_dic["MessageBodyMD5"] msg.priority = int(data_dic["Priority"]) msg.next_visible_time = int(data_dic["NextVisibleTime"]) msg.receipt_handle = data_dic["ReceiptHandle"] message_list.append(msg) except Exception as err: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) return message_list class PeekMessageDecoder(DecoderBase): @staticmethod def decode(xml_data, base64decode, req_id=None): data_dic = {} DecoderBase.xml_to_dic("Message", xml_data, data_dic, req_id) key_list = ["DequeueCount", "EnqueueTime", "FirstDequeueTime", "MessageBody", "MessageId", "MessageBodyMD5", "Priority"] for key in key_list: if key not in data_dic.keys(): raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) if base64decode: decode_str = base64.b64decode(data_dic["MessageBody"]) data_dic["MessageBody"] = decode_str return data_dic class BatchPeekMessageDecoder(DecoderBase): @staticmethod def decode(xml_data, base64decode, req_id=None): data_listofdic = [] message_list = [] DecoderBase.xml_to_listofdic("Messages", "Message", xml_data, data_listofdic, req_id) try: for data_dic in data_listofdic: msg = PeekMessageResponseEntry() if base64decode: msg.message_body = base64.b64decode(data_dic["MessageBody"]) else: msg.message_body = data_dic["MessageBody"] msg.dequeue_count = int(data_dic["DequeueCount"]) msg.enqueue_time = int(data_dic["EnqueueTime"]) msg.first_dequeue_time = int(data_dic["FirstDequeueTime"]) msg.message_id = data_dic["MessageId"] msg.message_body_md5 = data_dic["MessageBodyMD5"] msg.priority = int(data_dic["Priority"]) message_list.append(msg) except Exception as err: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) return message_list class BatchDeleteMessageDecoder(DecoderBase): @staticmethod def decodeError(xml_data, req_id=None): try: return ErrorDecoder.decodeError(xml_data, req_id) except Exception: pass data_listofdic = [] DecoderBase.xml_to_listofdic("Errors", "Error", xml_data, data_listofdic, req_id) if len(data_listofdic) == 0: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) key_list = sorted(["ErrorCode", "ErrorMessage", "ReceiptHandle"]) for data_dic in data_listofdic: for key in key_list: keys = sorted(data_dic.keys()) if keys != key_list: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) return data_listofdic[0]["ErrorCode"], data_listofdic[0]["ErrorMessage"], None, None, data_listofdic class ChangeMsgVisDecoder(DecoderBase): @staticmethod def decode(xml_data, req_id=None): data_dic = {} DecoderBase.xml_to_dic("ChangeVisibility", xml_data, data_dic, req_id) if "ReceiptHandle" in data_dic.keys() and "NextVisibleTime" in data_dic.keys(): return data_dic["ReceiptHandle"], data_dic["NextVisibleTime"] else: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) class ListTopicDecoder(DecoderBase): @staticmethod def decode(xml_data, with_meta, req_id=None): topicurl_list = [] topicmeta_list = [] next_marker = "" if (xml_data != ""): try: root = ElementTree.fromstring(xml_data) namespace = root.tag[0:-6] topics = list(root.iter(namespace + "Topic")) for topic in topics: topicMeta = {} for node in topic: nodeName = node.tag[len(namespace):] nodeValue = node.text.strip() if nodeName == "TopicURL" and len(nodeValue) > 0: topicurl_list.append(nodeValue) if len(nodeValue) > 0: topicMeta[nodeName] = nodeValue if with_meta: topicmeta_list.append(topicMeta) marker = list(root.iter(namespace + "NextMarker")) for node in marker: next_marker = node.text.strip() except Exception as err: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) else: raise MNSClientNetworkException("RespDataDamaged", "Xml data is \"\"!", req_id) return topicurl_list, str(next_marker), topicmeta_list class GetTopicAttrDecoder(DecoderBase): @staticmethod def decode(xml_data, req_id=None): data_dic = {} DecoderBase.xml_to_dic("Topic", xml_data, data_dic, req_id) key_list = ["MessageCount", "CreateTime", "LastModifyTime", "MaximumMessageSize", "MessageRetentionPeriod", "TopicName", "LoggingEnabled"] for key in key_list: if key not in data_dic.keys(): raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) return data_dic class PublishMessageDecoder(DecoderBase): @staticmethod def decode(xml_data, req_id=None): data_dic = {} DecoderBase.xml_to_dic("Message", xml_data, data_dic, req_id) key_list = ["MessageId", "MessageBodyMD5"] for key in key_list: if key not in data_dic.keys(): raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) return data_dic["MessageId"], data_dic["MessageBodyMD5"] class ListSubscriptionByTopicDecoder(DecoderBase): @staticmethod def decode(xml_data, req_id=None): subscriptionurl_list = [] next_marker = "" if (xml_data != ""): try: root = ElementTree.fromstring(xml_data) namespace = root.tag[0:-13] subscriptions = list(root.iter(namespace + "Subscription")) for subscription in subscriptions: for node in subscription: nodeName = node.tag[len(namespace):] nodeValue = node.text.strip() if nodeName == "SubscriptionURL" and len(nodeValue) > 0: subscriptionurl_list.append(nodeValue) marker = list(root.iter(namespace + "NextMarker")) for node in marker: next_marker = node.text.strip() except Exception: raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) else: raise MNSClientNetworkException("RespDataDamaged", "Xml data is \"\"!", req_id) return subscriptionurl_list, str(next_marker) class GetSubscriptionAttrDecoder(DecoderBase): @staticmethod def decode(xml_data, req_id=None): data_dic = {} DecoderBase.xml_to_dic("Subscription", xml_data, data_dic, req_id) key_list = ["TopicOwner", "TopicName", "SubscriptionName", "Endpoint", "NotifyStrategy", "NotifyContentFormat", "CreateTime", "LastModifyTime"] for key in key_list: if key not in data_dic.keys(): raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) return data_dic class ErrorDecoder(DecoderBase): @staticmethod def decodeError(xml_data, req_id=None): data_dic = {} DecoderBase.xml_to_dic("Error", xml_data, data_dic, req_id) key_list = ["Code", "Message", "RequestId", "HostId"] for key in key_list: if key not in data_dic.keys(): raise MNSClientNetworkException("RespDataDamaged", xml_data, req_id) return data_dic["Code"], data_dic["Message"], data_dic["RequestId"], data_dic["HostId"], None class OpenServiceDecoder(DecoderBase): @staticmethod def decode(xml_data, req_id=None): data_dic = {} DecoderBase.xml_to_dic("OpenService", xml_data, data_dic, req_id) return data_dic