CustomScript/azure/servicebus/__init__.py [22:850]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    WindowsAzureData,
    WindowsAzureError,
    xml_escape,
    _create_entry,
    _general_error_handler,
    _get_entry_properties,
    _get_child_nodes,
    _get_children_from_path,
    _get_first_child_node_value,
    _ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE,
    _ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK,
    _ERROR_QUEUE_NOT_FOUND,
    _ERROR_TOPIC_NOT_FOUND,
    )
from azure.http import HTTPError

# default rule name for subscription
DEFAULT_RULE_NAME = '$Default'

#-----------------------------------------------------------------------------
# Constants for Azure app environment settings.
AZURE_SERVICEBUS_NAMESPACE = 'AZURE_SERVICEBUS_NAMESPACE'
AZURE_SERVICEBUS_ACCESS_KEY = 'AZURE_SERVICEBUS_ACCESS_KEY'
AZURE_SERVICEBUS_ISSUER = 'AZURE_SERVICEBUS_ISSUER'

# namespace used for converting rules to objects
XML_SCHEMA_NAMESPACE = 'http://www.w3.org/2001/XMLSchema-instance'


class Queue(WindowsAzureData):

    ''' Queue class corresponding to Queue Description:
    http://msdn.microsoft.com/en-us/library/windowsazure/hh780773'''

    def __init__(self, lock_duration=None, max_size_in_megabytes=None,
                 requires_duplicate_detection=None, requires_session=None,
                 default_message_time_to_live=None,
                 dead_lettering_on_message_expiration=None,
                 duplicate_detection_history_time_window=None,
                 max_delivery_count=None, enable_batched_operations=None,
                 size_in_bytes=None, message_count=None):

        self.lock_duration = lock_duration
        self.max_size_in_megabytes = max_size_in_megabytes
        self.requires_duplicate_detection = requires_duplicate_detection
        self.requires_session = requires_session
        self.default_message_time_to_live = default_message_time_to_live
        self.dead_lettering_on_message_expiration = \
            dead_lettering_on_message_expiration
        self.duplicate_detection_history_time_window = \
            duplicate_detection_history_time_window
        self.max_delivery_count = max_delivery_count
        self.enable_batched_operations = enable_batched_operations
        self.size_in_bytes = size_in_bytes
        self.message_count = message_count


class Topic(WindowsAzureData):

    ''' Topic class corresponding to Topic Description:
    http://msdn.microsoft.com/en-us/library/windowsazure/hh780749. '''

    def __init__(self, default_message_time_to_live=None,
                 max_size_in_megabytes=None, requires_duplicate_detection=None,
                 duplicate_detection_history_time_window=None,
                 enable_batched_operations=None, size_in_bytes=None):

        self.default_message_time_to_live = default_message_time_to_live
        self.max_size_in_megabytes = max_size_in_megabytes
        self.requires_duplicate_detection = requires_duplicate_detection
        self.duplicate_detection_history_time_window = \
            duplicate_detection_history_time_window
        self.enable_batched_operations = enable_batched_operations
        self.size_in_bytes = size_in_bytes

    @property
    def max_size_in_mega_bytes(self):
        import warnings
        warnings.warn(
            'This attribute has been changed to max_size_in_megabytes.')
        return self.max_size_in_megabytes

    @max_size_in_mega_bytes.setter
    def max_size_in_mega_bytes(self, value):
        self.max_size_in_megabytes = value


class Subscription(WindowsAzureData):

    ''' Subscription class corresponding to Subscription Description:
    http://msdn.microsoft.com/en-us/library/windowsazure/hh780763. '''

    def __init__(self, lock_duration=None, requires_session=None,
                 default_message_time_to_live=None,
                 dead_lettering_on_message_expiration=None,
                 dead_lettering_on_filter_evaluation_exceptions=None,
                 enable_batched_operations=None, max_delivery_count=None,
                 message_count=None):

        self.lock_duration = lock_duration
        self.requires_session = requires_session
        self.default_message_time_to_live = default_message_time_to_live
        self.dead_lettering_on_message_expiration = \
            dead_lettering_on_message_expiration
        self.dead_lettering_on_filter_evaluation_exceptions = \
            dead_lettering_on_filter_evaluation_exceptions
        self.enable_batched_operations = enable_batched_operations
        self.max_delivery_count = max_delivery_count
        self.message_count = message_count


class Rule(WindowsAzureData):

    ''' Rule class corresponding to Rule Description:
    http://msdn.microsoft.com/en-us/library/windowsazure/hh780753. '''

    def __init__(self, filter_type=None, filter_expression=None,
                 action_type=None, action_expression=None):
        self.filter_type = filter_type
        self.filter_expression = filter_expression
        self.action_type = action_type
        self.action_expression = action_type


class Message(WindowsAzureData):

    ''' Message class that used in send message/get mesage apis. '''

    def __init__(self, body=None, service_bus_service=None, location=None,
                 custom_properties=None,
                 type='application/atom+xml;type=entry;charset=utf-8',
                 broker_properties=None):
        self.body = body
        self.location = location
        self.broker_properties = broker_properties
        self.custom_properties = custom_properties
        self.type = type
        self.service_bus_service = service_bus_service
        self._topic_name = None
        self._subscription_name = None
        self._queue_name = None

        if not service_bus_service:
            return

        # if location is set, then extracts the queue name for queue message and
        # extracts the topic and subscriptions name if it is topic message.
        if location:
            if '/subscriptions/' in location:
                pos = location.find('/subscriptions/')
                pos1 = location.rfind('/', 0, pos - 1)
                self._topic_name = location[pos1 + 1:pos]
                pos += len('/subscriptions/')
                pos1 = location.find('/', pos)
                self._subscription_name = location[pos:pos1]
            elif '/messages/' in location:
                pos = location.find('/messages/')
                pos1 = location.rfind('/', 0, pos - 1)
                self._queue_name = location[pos1 + 1:pos]

    def delete(self):
        ''' Deletes itself if find queue name or topic name and subscription
        name. '''
        if self._queue_name:
            self.service_bus_service.delete_queue_message(
                self._queue_name,
                self.broker_properties['SequenceNumber'],
                self.broker_properties['LockToken'])
        elif self._topic_name and self._subscription_name:
            self.service_bus_service.delete_subscription_message(
                self._topic_name,
                self._subscription_name,
                self.broker_properties['SequenceNumber'],
                self.broker_properties['LockToken'])
        else:
            raise WindowsAzureError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE)

    def unlock(self):
        ''' Unlocks itself if find queue name or topic name and subscription
        name. '''
        if self._queue_name:
            self.service_bus_service.unlock_queue_message(
                self._queue_name,
                self.broker_properties['SequenceNumber'],
                self.broker_properties['LockToken'])
        elif self._topic_name and self._subscription_name:
            self.service_bus_service.unlock_subscription_message(
                self._topic_name,
                self._subscription_name,
                self.broker_properties['SequenceNumber'],
                self.broker_properties['LockToken'])
        else:
            raise WindowsAzureError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK)

    def add_headers(self, request):
        ''' add addtional headers to request for message request.'''

        # Adds custom properties
        if self.custom_properties:
            for name, value in self.custom_properties.items():
                if sys.version_info < (3,) and isinstance(value, unicode):
                    request.headers.append(
                        (name, '"' + value.encode('utf-8') + '"'))
                elif isinstance(value, str):
                    request.headers.append((name, '"' + str(value) + '"'))
                elif isinstance(value, datetime):
                    request.headers.append(
                        (name, '"' + value.strftime('%a, %d %b %Y %H:%M:%S GMT') + '"'))
                else:
                    request.headers.append((name, str(value).lower()))

        # Adds content-type
        request.headers.append(('Content-Type', self.type))

        # Adds BrokerProperties
        if self.broker_properties:
            request.headers.append(
                ('BrokerProperties', str(self.broker_properties)))

        return request.headers


def _create_message(response, service_instance):
    ''' Create message from response.

    response: response from service bus cloud server.
    service_instance: the service bus client.
    '''
    respbody = response.body
    custom_properties = {}
    broker_properties = None
    message_type = None
    message_location = None

    # gets all information from respheaders.
    for name, value in response.headers:
        if name.lower() == 'brokerproperties':
            broker_properties = json.loads(value)
        elif name.lower() == 'content-type':
            message_type = value
        elif name.lower() == 'location':
            message_location = value
        elif name.lower() not in ['content-type',
                                  'brokerproperties',
                                  'transfer-encoding',
                                  'server',
                                  'location',
                                  'date']:
            if '"' in value:
                value = value[1:-1]
                try:
                    custom_properties[name] = datetime.strptime(
                        value, '%a, %d %b %Y %H:%M:%S GMT')
                except ValueError:
                    custom_properties[name] = value
            else:  # only int, float or boolean
                if value.lower() == 'true':
                    custom_properties[name] = True
                elif value.lower() == 'false':
                    custom_properties[name] = False
                # int('3.1') doesn't work so need to get float('3.14') first
                elif str(int(float(value))) == value:
                    custom_properties[name] = int(value)
                else:
                    custom_properties[name] = float(value)

    if message_type == None:
        message = Message(
            respbody, service_instance, message_location, custom_properties,
            'application/atom+xml;type=entry;charset=utf-8', broker_properties)
    else:
        message = Message(respbody, service_instance, message_location,
                          custom_properties, message_type, broker_properties)
    return message

# convert functions


def _convert_response_to_rule(response):
    return _convert_xml_to_rule(response.body)


def _convert_xml_to_rule(xmlstr):
    ''' Converts response xml to rule object.

    The format of xml for rule:
<entry xmlns='http://www.w3.org/2005/Atom'>
<content type='application/xml'>
<RuleDescription
    xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
    <Filter i:type="SqlFilterExpression">
        <SqlExpression>MyProperty='XYZ'</SqlExpression>
    </Filter>
    <Action i:type="SqlFilterAction">
        <SqlExpression>set MyProperty2 = 'ABC'</SqlExpression>
    </Action>
</RuleDescription>
</content>
</entry>
    '''
    xmldoc = minidom.parseString(xmlstr)
    rule = Rule()

    for rule_desc in _get_children_from_path(xmldoc,
                                             'entry',
                                             'content',
                                             'RuleDescription'):
        for xml_filter in _get_child_nodes(rule_desc, 'Filter'):
            filter_type = xml_filter.getAttributeNS(
                XML_SCHEMA_NAMESPACE, 'type')
            setattr(rule, 'filter_type', str(filter_type))
            if xml_filter.childNodes:

                for expr in _get_child_nodes(xml_filter, 'SqlExpression'):
                    setattr(rule, 'filter_expression',
                            expr.firstChild.nodeValue)

        for xml_action in _get_child_nodes(rule_desc, 'Action'):
            action_type = xml_action.getAttributeNS(
                XML_SCHEMA_NAMESPACE, 'type')
            setattr(rule, 'action_type', str(action_type))
            if xml_action.childNodes:
                action_expression = xml_action.childNodes[0].firstChild
                if action_expression:
                    setattr(rule, 'action_expression',
                            action_expression.nodeValue)

    # extract id, updated and name value from feed entry and set them of rule.
    for name, value in _get_entry_properties(xmlstr, True, '/rules').items():
        setattr(rule, name, value)

    return rule


def _convert_response_to_queue(response):
    return _convert_xml_to_queue(response.body)


def _parse_bool(value):
    if value.lower() == 'true':
        return True
    return False


def _convert_xml_to_queue(xmlstr):
    ''' Converts xml response to queue object.

    The format of xml response for queue:
<QueueDescription
    xmlns=\"http://schemas.microsoft.com/netservices/2010/10/servicebus/connect\">
    <MaxSizeInBytes>10000</MaxSizeInBytes>
    <DefaultMessageTimeToLive>PT5M</DefaultMessageTimeToLive>
    <LockDuration>PT2M</LockDuration>
    <RequiresGroupedReceives>False</RequiresGroupedReceives>
    <SupportsDuplicateDetection>False</SupportsDuplicateDetection>
    ...
</QueueDescription>

    '''
    xmldoc = minidom.parseString(xmlstr)
    queue = Queue()

    invalid_queue = True
    # get node for each attribute in Queue class, if nothing found then the
    # response is not valid xml for Queue.
    for desc in _get_children_from_path(xmldoc,
                                        'entry',
                                        'content',
                                        'QueueDescription'):
        node_value = _get_first_child_node_value(desc, 'LockDuration')
        if node_value is not None:
            queue.lock_duration = node_value
            invalid_queue = False

        node_value = _get_first_child_node_value(desc, 'MaxSizeInMegabytes')
        if node_value is not None:
            queue.max_size_in_megabytes = int(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(
            desc, 'RequiresDuplicateDetection')
        if node_value is not None:
            queue.requires_duplicate_detection = _parse_bool(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(desc, 'RequiresSession')
        if node_value is not None:
            queue.requires_session = _parse_bool(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(
            desc, 'DefaultMessageTimeToLive')
        if node_value is not None:
            queue.default_message_time_to_live = node_value
            invalid_queue = False

        node_value = _get_first_child_node_value(
            desc, 'DeadLetteringOnMessageExpiration')
        if node_value is not None:
            queue.dead_lettering_on_message_expiration = _parse_bool(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(
            desc, 'DuplicateDetectionHistoryTimeWindow')
        if node_value is not None:
            queue.duplicate_detection_history_time_window = node_value
            invalid_queue = False

        node_value = _get_first_child_node_value(
            desc, 'EnableBatchedOperations')
        if node_value is not None:
            queue.enable_batched_operations = _parse_bool(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(desc, 'MaxDeliveryCount')
        if node_value is not None:
            queue.max_delivery_count = int(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(desc, 'MessageCount')
        if node_value is not None:
            queue.message_count = int(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(desc, 'SizeInBytes')
        if node_value is not None:
            queue.size_in_bytes = int(node_value)
            invalid_queue = False

    if invalid_queue:
        raise WindowsAzureError(_ERROR_QUEUE_NOT_FOUND)

    # extract id, updated and name value from feed entry and set them of queue.
    for name, value in _get_entry_properties(xmlstr, True).items():
        setattr(queue, name, value)

    return queue


def _convert_response_to_topic(response):
    return _convert_xml_to_topic(response.body)


def _convert_xml_to_topic(xmlstr):
    '''Converts xml response to topic

    The xml format for topic:
<entry xmlns='http://www.w3.org/2005/Atom'>
    <content type='application/xml'>
    <TopicDescription
        xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
        xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
        <DefaultMessageTimeToLive>P10675199DT2H48M5.4775807S</DefaultMessageTimeToLive>
        <MaxSizeInMegabytes>1024</MaxSizeInMegabytes>
        <RequiresDuplicateDetection>false</RequiresDuplicateDetection>
        <DuplicateDetectionHistoryTimeWindow>P7D</DuplicateDetectionHistoryTimeWindow>
        <DeadLetteringOnFilterEvaluationExceptions>true</DeadLetteringOnFilterEvaluationExceptions>
    </TopicDescription>
    </content>
</entry>
    '''
    xmldoc = minidom.parseString(xmlstr)
    topic = Topic()

    invalid_topic = True

    # get node for each attribute in Topic class, if nothing found then the
    # response is not valid xml for Topic.
    for desc in _get_children_from_path(xmldoc,
                                        'entry',
                                        'content',
                                        'TopicDescription'):
        invalid_topic = True
        node_value = _get_first_child_node_value(
            desc, 'DefaultMessageTimeToLive')
        if node_value is not None:
            topic.default_message_time_to_live = node_value
            invalid_topic = False
        node_value = _get_first_child_node_value(desc, 'MaxSizeInMegabytes')
        if node_value is not None:
            topic.max_size_in_megabytes = int(node_value)
            invalid_topic = False
        node_value = _get_first_child_node_value(
            desc, 'RequiresDuplicateDetection')
        if node_value is not None:
            topic.requires_duplicate_detection = _parse_bool(node_value)
            invalid_topic = False
        node_value = _get_first_child_node_value(
            desc, 'DuplicateDetectionHistoryTimeWindow')
        if node_value is not None:
            topic.duplicate_detection_history_time_window = node_value
            invalid_topic = False
        node_value = _get_first_child_node_value(
            desc, 'EnableBatchedOperations')
        if node_value is not None:
            topic.enable_batched_operations = _parse_bool(node_value)
            invalid_topic = False
        node_value = _get_first_child_node_value(desc, 'SizeInBytes')
        if node_value is not None:
            topic.size_in_bytes = int(node_value)
            invalid_topic = False

    if invalid_topic:
        raise WindowsAzureError(_ERROR_TOPIC_NOT_FOUND)

    # extract id, updated and name value from feed entry and set them of topic.
    for name, value in _get_entry_properties(xmlstr, True).items():
        setattr(topic, name, value)
    return topic


def _convert_response_to_subscription(response):
    return _convert_xml_to_subscription(response.body)


def _convert_xml_to_subscription(xmlstr):
    '''Converts xml response to subscription

    The xml format for subscription:
<entry xmlns='http://www.w3.org/2005/Atom'>
    <content type='application/xml'>
    <SubscriptionDescription
        xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
        xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
        <LockDuration>PT5M</LockDuration>
        <RequiresSession>false</RequiresSession>
        <DefaultMessageTimeToLive>P10675199DT2H48M5.4775807S</DefaultMessageTimeToLive>
        <DeadLetteringOnMessageExpiration>false</DeadLetteringOnMessageExpiration>
        <DeadLetteringOnFilterEvaluationExceptions>true</DeadLetteringOnFilterEvaluationExceptions>
    </SubscriptionDescription>
    </content>
</entry>
    '''
    xmldoc = minidom.parseString(xmlstr)
    subscription = Subscription()

    for desc in _get_children_from_path(xmldoc,
                                        'entry',
                                        'content',
                                        'SubscriptionDescription'):
        node_value = _get_first_child_node_value(desc, 'LockDuration')
        if node_value is not None:
            subscription.lock_duration = node_value

        node_value = _get_first_child_node_value(
            desc, 'RequiresSession')
        if node_value is not None:
            subscription.requires_session = _parse_bool(node_value)

        node_value = _get_first_child_node_value(
            desc, 'DefaultMessageTimeToLive')
        if node_value is not None:
            subscription.default_message_time_to_live = node_value

        node_value = _get_first_child_node_value(
            desc, 'DeadLetteringOnFilterEvaluationExceptions')
        if node_value is not None:
            subscription.dead_lettering_on_filter_evaluation_exceptions = \
                _parse_bool(node_value)

        node_value = _get_first_child_node_value(
            desc, 'DeadLetteringOnMessageExpiration')
        if node_value is not None:
            subscription.dead_lettering_on_message_expiration = \
                _parse_bool(node_value)

        node_value = _get_first_child_node_value(
            desc, 'EnableBatchedOperations')
        if node_value is not None:
            subscription.enable_batched_operations = _parse_bool(node_value)

        node_value = _get_first_child_node_value(
            desc, 'MaxDeliveryCount')
        if node_value is not None:
            subscription.max_delivery_count = int(node_value)

        node_value = _get_first_child_node_value(
            desc, 'MessageCount')
        if node_value is not None:
            subscription.message_count = int(node_value)

    for name, value in _get_entry_properties(xmlstr,
                                             True,
                                             '/subscriptions').items():
        setattr(subscription, name, value)

    return subscription


def _convert_subscription_to_xml(subscription):
    '''
    Converts a subscription object to xml to send.  The order of each field of
    subscription in xml is very important so we can't simple call
    convert_class_to_xml.

    subscription: the subsciption object to be converted.
    '''

    subscription_body = '<SubscriptionDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
    if subscription:
        if subscription.lock_duration is not None:
            subscription_body += ''.join(
                ['<LockDuration>',
                 str(subscription.lock_duration),
                 '</LockDuration>'])

        if subscription.requires_session is not None:
            subscription_body += ''.join(
                ['<RequiresSession>',
                 str(subscription.requires_session).lower(),
                 '</RequiresSession>'])

        if subscription.default_message_time_to_live is not None:
            subscription_body += ''.join(
                ['<DefaultMessageTimeToLive>',
                 str(subscription.default_message_time_to_live),
                 '</DefaultMessageTimeToLive>'])

        if subscription.dead_lettering_on_message_expiration is not None:
            subscription_body += ''.join(
                ['<DeadLetteringOnMessageExpiration>',
                 str(subscription.dead_lettering_on_message_expiration).lower(),
                 '</DeadLetteringOnMessageExpiration>'])

        if subscription.dead_lettering_on_filter_evaluation_exceptions is not None:
            subscription_body += ''.join(
                ['<DeadLetteringOnFilterEvaluationExceptions>',
                 str(subscription.dead_lettering_on_filter_evaluation_exceptions).lower(),
                 '</DeadLetteringOnFilterEvaluationExceptions>'])

        if subscription.enable_batched_operations is not None:
            subscription_body += ''.join(
                ['<EnableBatchedOperations>',
                 str(subscription.enable_batched_operations).lower(),
                 '</EnableBatchedOperations>'])

        if subscription.max_delivery_count is not None:
            subscription_body += ''.join(
                ['<MaxDeliveryCount>',
                 str(subscription.max_delivery_count),
                 '</MaxDeliveryCount>'])

        if subscription.message_count is not None:
            subscription_body += ''.join(
                ['<MessageCount>',
                 str(subscription.message_count),
                 '</MessageCount>'])

    subscription_body += '</SubscriptionDescription>'
    return _create_entry(subscription_body)


def _convert_rule_to_xml(rule):
    '''
    Converts a rule object to xml to send.  The order of each field of rule
    in xml is very important so we cann't simple call convert_class_to_xml.

    rule: the rule object to be converted.
    '''
    rule_body = '<RuleDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
    if rule:
        if rule.filter_type:
            rule_body += ''.join(
                ['<Filter i:type="',
                 xml_escape(rule.filter_type),
                 '">'])
            if rule.filter_type == 'CorrelationFilter':
                rule_body += ''.join(
                    ['<CorrelationId>',
                     xml_escape(rule.filter_expression),
                     '</CorrelationId>'])
            else:
                rule_body += ''.join(
                    ['<SqlExpression>',
                     xml_escape(rule.filter_expression),
                     '</SqlExpression>'])
                rule_body += '<CompatibilityLevel>20</CompatibilityLevel>'
            rule_body += '</Filter>'
        if rule.action_type:
            rule_body += ''.join(
                ['<Action i:type="',
                 xml_escape(rule.action_type),
                 '">'])
            if rule.action_type == 'SqlRuleAction':
                rule_body += ''.join(
                    ['<SqlExpression>',
                     xml_escape(rule.action_expression),
                     '</SqlExpression>'])
                rule_body += '<CompatibilityLevel>20</CompatibilityLevel>'
            rule_body += '</Action>'
    rule_body += '</RuleDescription>'

    return _create_entry(rule_body)


def _convert_topic_to_xml(topic):
    '''
    Converts a topic object to xml to send.  The order of each field of topic
    in xml is very important so we cann't simple call convert_class_to_xml.

    topic: the topic object to be converted.
    '''

    topic_body = '<TopicDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
    if topic:
        if topic.default_message_time_to_live is not None:
            topic_body += ''.join(
                ['<DefaultMessageTimeToLive>',
                 str(topic.default_message_time_to_live),
                 '</DefaultMessageTimeToLive>'])

        if topic.max_size_in_megabytes is not None:
            topic_body += ''.join(
                ['<MaxSizeInMegabytes>',
                 str(topic.max_size_in_megabytes),
                 '</MaxSizeInMegabytes>'])

        if topic.requires_duplicate_detection is not None:
            topic_body += ''.join(
                ['<RequiresDuplicateDetection>',
                 str(topic.requires_duplicate_detection).lower(),
                 '</RequiresDuplicateDetection>'])

        if topic.duplicate_detection_history_time_window is not None:
            topic_body += ''.join(
                ['<DuplicateDetectionHistoryTimeWindow>',
                 str(topic.duplicate_detection_history_time_window),
                 '</DuplicateDetectionHistoryTimeWindow>'])

        if topic.enable_batched_operations is not None:
            topic_body += ''.join(
                ['<EnableBatchedOperations>',
                 str(topic.enable_batched_operations).lower(),
                 '</EnableBatchedOperations>'])

        if topic.size_in_bytes is not None:
            topic_body += ''.join(
                ['<SizeInBytes>',
                 str(topic.size_in_bytes),
                 '</SizeInBytes>'])

    topic_body += '</TopicDescription>'

    return _create_entry(topic_body)


def _convert_queue_to_xml(queue):
    '''
    Converts a queue object to xml to send.  The order of each field of queue
    in xml is very important so we cann't simple call convert_class_to_xml.

    queue: the queue object to be converted.
    '''
    queue_body = '<QueueDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
    if queue:
        if queue.lock_duration:
            queue_body += ''.join(
                ['<LockDuration>',
                 str(queue.lock_duration),
                 '</LockDuration>'])

        if queue.max_size_in_megabytes is not None:
            queue_body += ''.join(
                ['<MaxSizeInMegabytes>',
                 str(queue.max_size_in_megabytes),
                 '</MaxSizeInMegabytes>'])

        if queue.requires_duplicate_detection is not None:
            queue_body += ''.join(
                ['<RequiresDuplicateDetection>',
                 str(queue.requires_duplicate_detection).lower(),
                 '</RequiresDuplicateDetection>'])

        if queue.requires_session is not None:
            queue_body += ''.join(
                ['<RequiresSession>',
                 str(queue.requires_session).lower(),
                 '</RequiresSession>'])

        if queue.default_message_time_to_live is not None:
            queue_body += ''.join(
                ['<DefaultMessageTimeToLive>',
                 str(queue.default_message_time_to_live),
                 '</DefaultMessageTimeToLive>'])

        if queue.dead_lettering_on_message_expiration is not None:
            queue_body += ''.join(
                ['<DeadLetteringOnMessageExpiration>',
                 str(queue.dead_lettering_on_message_expiration).lower(),
                 '</DeadLetteringOnMessageExpiration>'])

        if queue.duplicate_detection_history_time_window is not None:
            queue_body += ''.join(
                ['<DuplicateDetectionHistoryTimeWindow>',
                 str(queue.duplicate_detection_history_time_window),
                 '</DuplicateDetectionHistoryTimeWindow>'])

        if queue.max_delivery_count is not None:
            queue_body += ''.join(
                ['<MaxDeliveryCount>',
                 str(queue.max_delivery_count),
                 '</MaxDeliveryCount>'])

        if queue.enable_batched_operations is not None:
            queue_body += ''.join(
                ['<EnableBatchedOperations>',
                 str(queue.enable_batched_operations).lower(),
                 '</EnableBatchedOperations>'])

        if queue.size_in_bytes is not None:
            queue_body += ''.join(
                ['<SizeInBytes>',
                 str(queue.size_in_bytes),
                 '</SizeInBytes>'])

        if queue.message_count is not None:
            queue_body += ''.join(
                ['<MessageCount>',
                 str(queue.message_count),
                 '</MessageCount>'])

    queue_body += '</QueueDescription>'
    return _create_entry(queue_body)


def _service_bus_error_handler(http_error):
    ''' Simple error handler for service bus service. '''
    return _general_error_handler(http_error)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



OSPatching/azure/servicebus/__init__.py [22:850]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    WindowsAzureData,
    WindowsAzureError,
    xml_escape,
    _create_entry,
    _general_error_handler,
    _get_entry_properties,
    _get_child_nodes,
    _get_children_from_path,
    _get_first_child_node_value,
    _ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE,
    _ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK,
    _ERROR_QUEUE_NOT_FOUND,
    _ERROR_TOPIC_NOT_FOUND,
    )
from azure.http import HTTPError

# default rule name for subscription
DEFAULT_RULE_NAME = '$Default'

#-----------------------------------------------------------------------------
# Constants for Azure app environment settings.
AZURE_SERVICEBUS_NAMESPACE = 'AZURE_SERVICEBUS_NAMESPACE'
AZURE_SERVICEBUS_ACCESS_KEY = 'AZURE_SERVICEBUS_ACCESS_KEY'
AZURE_SERVICEBUS_ISSUER = 'AZURE_SERVICEBUS_ISSUER'

# namespace used for converting rules to objects
XML_SCHEMA_NAMESPACE = 'http://www.w3.org/2001/XMLSchema-instance'


class Queue(WindowsAzureData):

    ''' Queue class corresponding to Queue Description:
    http://msdn.microsoft.com/en-us/library/windowsazure/hh780773'''

    def __init__(self, lock_duration=None, max_size_in_megabytes=None,
                 requires_duplicate_detection=None, requires_session=None,
                 default_message_time_to_live=None,
                 dead_lettering_on_message_expiration=None,
                 duplicate_detection_history_time_window=None,
                 max_delivery_count=None, enable_batched_operations=None,
                 size_in_bytes=None, message_count=None):

        self.lock_duration = lock_duration
        self.max_size_in_megabytes = max_size_in_megabytes
        self.requires_duplicate_detection = requires_duplicate_detection
        self.requires_session = requires_session
        self.default_message_time_to_live = default_message_time_to_live
        self.dead_lettering_on_message_expiration = \
            dead_lettering_on_message_expiration
        self.duplicate_detection_history_time_window = \
            duplicate_detection_history_time_window
        self.max_delivery_count = max_delivery_count
        self.enable_batched_operations = enable_batched_operations
        self.size_in_bytes = size_in_bytes
        self.message_count = message_count


class Topic(WindowsAzureData):

    ''' Topic class corresponding to Topic Description:
    http://msdn.microsoft.com/en-us/library/windowsazure/hh780749. '''

    def __init__(self, default_message_time_to_live=None,
                 max_size_in_megabytes=None, requires_duplicate_detection=None,
                 duplicate_detection_history_time_window=None,
                 enable_batched_operations=None, size_in_bytes=None):

        self.default_message_time_to_live = default_message_time_to_live
        self.max_size_in_megabytes = max_size_in_megabytes
        self.requires_duplicate_detection = requires_duplicate_detection
        self.duplicate_detection_history_time_window = \
            duplicate_detection_history_time_window
        self.enable_batched_operations = enable_batched_operations
        self.size_in_bytes = size_in_bytes

    @property
    def max_size_in_mega_bytes(self):
        import warnings
        warnings.warn(
            'This attribute has been changed to max_size_in_megabytes.')
        return self.max_size_in_megabytes

    @max_size_in_mega_bytes.setter
    def max_size_in_mega_bytes(self, value):
        self.max_size_in_megabytes = value


class Subscription(WindowsAzureData):

    ''' Subscription class corresponding to Subscription Description:
    http://msdn.microsoft.com/en-us/library/windowsazure/hh780763. '''

    def __init__(self, lock_duration=None, requires_session=None,
                 default_message_time_to_live=None,
                 dead_lettering_on_message_expiration=None,
                 dead_lettering_on_filter_evaluation_exceptions=None,
                 enable_batched_operations=None, max_delivery_count=None,
                 message_count=None):

        self.lock_duration = lock_duration
        self.requires_session = requires_session
        self.default_message_time_to_live = default_message_time_to_live
        self.dead_lettering_on_message_expiration = \
            dead_lettering_on_message_expiration
        self.dead_lettering_on_filter_evaluation_exceptions = \
            dead_lettering_on_filter_evaluation_exceptions
        self.enable_batched_operations = enable_batched_operations
        self.max_delivery_count = max_delivery_count
        self.message_count = message_count


class Rule(WindowsAzureData):

    ''' Rule class corresponding to Rule Description:
    http://msdn.microsoft.com/en-us/library/windowsazure/hh780753. '''

    def __init__(self, filter_type=None, filter_expression=None,
                 action_type=None, action_expression=None):
        self.filter_type = filter_type
        self.filter_expression = filter_expression
        self.action_type = action_type
        self.action_expression = action_type


class Message(WindowsAzureData):

    ''' Message class that used in send message/get mesage apis. '''

    def __init__(self, body=None, service_bus_service=None, location=None,
                 custom_properties=None,
                 type='application/atom+xml;type=entry;charset=utf-8',
                 broker_properties=None):
        self.body = body
        self.location = location
        self.broker_properties = broker_properties
        self.custom_properties = custom_properties
        self.type = type
        self.service_bus_service = service_bus_service
        self._topic_name = None
        self._subscription_name = None
        self._queue_name = None

        if not service_bus_service:
            return

        # if location is set, then extracts the queue name for queue message and
        # extracts the topic and subscriptions name if it is topic message.
        if location:
            if '/subscriptions/' in location:
                pos = location.find('/subscriptions/')
                pos1 = location.rfind('/', 0, pos - 1)
                self._topic_name = location[pos1 + 1:pos]
                pos += len('/subscriptions/')
                pos1 = location.find('/', pos)
                self._subscription_name = location[pos:pos1]
            elif '/messages/' in location:
                pos = location.find('/messages/')
                pos1 = location.rfind('/', 0, pos - 1)
                self._queue_name = location[pos1 + 1:pos]

    def delete(self):
        ''' Deletes itself if find queue name or topic name and subscription
        name. '''
        if self._queue_name:
            self.service_bus_service.delete_queue_message(
                self._queue_name,
                self.broker_properties['SequenceNumber'],
                self.broker_properties['LockToken'])
        elif self._topic_name and self._subscription_name:
            self.service_bus_service.delete_subscription_message(
                self._topic_name,
                self._subscription_name,
                self.broker_properties['SequenceNumber'],
                self.broker_properties['LockToken'])
        else:
            raise WindowsAzureError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE)

    def unlock(self):
        ''' Unlocks itself if find queue name or topic name and subscription
        name. '''
        if self._queue_name:
            self.service_bus_service.unlock_queue_message(
                self._queue_name,
                self.broker_properties['SequenceNumber'],
                self.broker_properties['LockToken'])
        elif self._topic_name and self._subscription_name:
            self.service_bus_service.unlock_subscription_message(
                self._topic_name,
                self._subscription_name,
                self.broker_properties['SequenceNumber'],
                self.broker_properties['LockToken'])
        else:
            raise WindowsAzureError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK)

    def add_headers(self, request):
        ''' add addtional headers to request for message request.'''

        # Adds custom properties
        if self.custom_properties:
            for name, value in self.custom_properties.items():
                if sys.version_info < (3,) and isinstance(value, unicode):
                    request.headers.append(
                        (name, '"' + value.encode('utf-8') + '"'))
                elif isinstance(value, str):
                    request.headers.append((name, '"' + str(value) + '"'))
                elif isinstance(value, datetime):
                    request.headers.append(
                        (name, '"' + value.strftime('%a, %d %b %Y %H:%M:%S GMT') + '"'))
                else:
                    request.headers.append((name, str(value).lower()))

        # Adds content-type
        request.headers.append(('Content-Type', self.type))

        # Adds BrokerProperties
        if self.broker_properties:
            request.headers.append(
                ('BrokerProperties', str(self.broker_properties)))

        return request.headers


def _create_message(response, service_instance):
    ''' Create message from response.

    response: response from service bus cloud server.
    service_instance: the service bus client.
    '''
    respbody = response.body
    custom_properties = {}
    broker_properties = None
    message_type = None
    message_location = None

    # gets all information from respheaders.
    for name, value in response.headers:
        if name.lower() == 'brokerproperties':
            broker_properties = json.loads(value)
        elif name.lower() == 'content-type':
            message_type = value
        elif name.lower() == 'location':
            message_location = value
        elif name.lower() not in ['content-type',
                                  'brokerproperties',
                                  'transfer-encoding',
                                  'server',
                                  'location',
                                  'date']:
            if '"' in value:
                value = value[1:-1]
                try:
                    custom_properties[name] = datetime.strptime(
                        value, '%a, %d %b %Y %H:%M:%S GMT')
                except ValueError:
                    custom_properties[name] = value
            else:  # only int, float or boolean
                if value.lower() == 'true':
                    custom_properties[name] = True
                elif value.lower() == 'false':
                    custom_properties[name] = False
                # int('3.1') doesn't work so need to get float('3.14') first
                elif str(int(float(value))) == value:
                    custom_properties[name] = int(value)
                else:
                    custom_properties[name] = float(value)

    if message_type == None:
        message = Message(
            respbody, service_instance, message_location, custom_properties,
            'application/atom+xml;type=entry;charset=utf-8', broker_properties)
    else:
        message = Message(respbody, service_instance, message_location,
                          custom_properties, message_type, broker_properties)
    return message

# convert functions


def _convert_response_to_rule(response):
    return _convert_xml_to_rule(response.body)


def _convert_xml_to_rule(xmlstr):
    ''' Converts response xml to rule object.

    The format of xml for rule:
<entry xmlns='http://www.w3.org/2005/Atom'>
<content type='application/xml'>
<RuleDescription
    xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
    <Filter i:type="SqlFilterExpression">
        <SqlExpression>MyProperty='XYZ'</SqlExpression>
    </Filter>
    <Action i:type="SqlFilterAction">
        <SqlExpression>set MyProperty2 = 'ABC'</SqlExpression>
    </Action>
</RuleDescription>
</content>
</entry>
    '''
    xmldoc = minidom.parseString(xmlstr)
    rule = Rule()

    for rule_desc in _get_children_from_path(xmldoc,
                                             'entry',
                                             'content',
                                             'RuleDescription'):
        for xml_filter in _get_child_nodes(rule_desc, 'Filter'):
            filter_type = xml_filter.getAttributeNS(
                XML_SCHEMA_NAMESPACE, 'type')
            setattr(rule, 'filter_type', str(filter_type))
            if xml_filter.childNodes:

                for expr in _get_child_nodes(xml_filter, 'SqlExpression'):
                    setattr(rule, 'filter_expression',
                            expr.firstChild.nodeValue)

        for xml_action in _get_child_nodes(rule_desc, 'Action'):
            action_type = xml_action.getAttributeNS(
                XML_SCHEMA_NAMESPACE, 'type')
            setattr(rule, 'action_type', str(action_type))
            if xml_action.childNodes:
                action_expression = xml_action.childNodes[0].firstChild
                if action_expression:
                    setattr(rule, 'action_expression',
                            action_expression.nodeValue)

    # extract id, updated and name value from feed entry and set them of rule.
    for name, value in _get_entry_properties(xmlstr, True, '/rules').items():
        setattr(rule, name, value)

    return rule


def _convert_response_to_queue(response):
    return _convert_xml_to_queue(response.body)


def _parse_bool(value):
    if value.lower() == 'true':
        return True
    return False


def _convert_xml_to_queue(xmlstr):
    ''' Converts xml response to queue object.

    The format of xml response for queue:
<QueueDescription
    xmlns=\"http://schemas.microsoft.com/netservices/2010/10/servicebus/connect\">
    <MaxSizeInBytes>10000</MaxSizeInBytes>
    <DefaultMessageTimeToLive>PT5M</DefaultMessageTimeToLive>
    <LockDuration>PT2M</LockDuration>
    <RequiresGroupedReceives>False</RequiresGroupedReceives>
    <SupportsDuplicateDetection>False</SupportsDuplicateDetection>
    ...
</QueueDescription>

    '''
    xmldoc = minidom.parseString(xmlstr)
    queue = Queue()

    invalid_queue = True
    # get node for each attribute in Queue class, if nothing found then the
    # response is not valid xml for Queue.
    for desc in _get_children_from_path(xmldoc,
                                        'entry',
                                        'content',
                                        'QueueDescription'):
        node_value = _get_first_child_node_value(desc, 'LockDuration')
        if node_value is not None:
            queue.lock_duration = node_value
            invalid_queue = False

        node_value = _get_first_child_node_value(desc, 'MaxSizeInMegabytes')
        if node_value is not None:
            queue.max_size_in_megabytes = int(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(
            desc, 'RequiresDuplicateDetection')
        if node_value is not None:
            queue.requires_duplicate_detection = _parse_bool(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(desc, 'RequiresSession')
        if node_value is not None:
            queue.requires_session = _parse_bool(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(
            desc, 'DefaultMessageTimeToLive')
        if node_value is not None:
            queue.default_message_time_to_live = node_value
            invalid_queue = False

        node_value = _get_first_child_node_value(
            desc, 'DeadLetteringOnMessageExpiration')
        if node_value is not None:
            queue.dead_lettering_on_message_expiration = _parse_bool(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(
            desc, 'DuplicateDetectionHistoryTimeWindow')
        if node_value is not None:
            queue.duplicate_detection_history_time_window = node_value
            invalid_queue = False

        node_value = _get_first_child_node_value(
            desc, 'EnableBatchedOperations')
        if node_value is not None:
            queue.enable_batched_operations = _parse_bool(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(desc, 'MaxDeliveryCount')
        if node_value is not None:
            queue.max_delivery_count = int(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(desc, 'MessageCount')
        if node_value is not None:
            queue.message_count = int(node_value)
            invalid_queue = False

        node_value = _get_first_child_node_value(desc, 'SizeInBytes')
        if node_value is not None:
            queue.size_in_bytes = int(node_value)
            invalid_queue = False

    if invalid_queue:
        raise WindowsAzureError(_ERROR_QUEUE_NOT_FOUND)

    # extract id, updated and name value from feed entry and set them of queue.
    for name, value in _get_entry_properties(xmlstr, True).items():
        setattr(queue, name, value)

    return queue


def _convert_response_to_topic(response):
    return _convert_xml_to_topic(response.body)


def _convert_xml_to_topic(xmlstr):
    '''Converts xml response to topic

    The xml format for topic:
<entry xmlns='http://www.w3.org/2005/Atom'>
    <content type='application/xml'>
    <TopicDescription
        xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
        xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
        <DefaultMessageTimeToLive>P10675199DT2H48M5.4775807S</DefaultMessageTimeToLive>
        <MaxSizeInMegabytes>1024</MaxSizeInMegabytes>
        <RequiresDuplicateDetection>false</RequiresDuplicateDetection>
        <DuplicateDetectionHistoryTimeWindow>P7D</DuplicateDetectionHistoryTimeWindow>
        <DeadLetteringOnFilterEvaluationExceptions>true</DeadLetteringOnFilterEvaluationExceptions>
    </TopicDescription>
    </content>
</entry>
    '''
    xmldoc = minidom.parseString(xmlstr)
    topic = Topic()

    invalid_topic = True

    # get node for each attribute in Topic class, if nothing found then the
    # response is not valid xml for Topic.
    for desc in _get_children_from_path(xmldoc,
                                        'entry',
                                        'content',
                                        'TopicDescription'):
        invalid_topic = True
        node_value = _get_first_child_node_value(
            desc, 'DefaultMessageTimeToLive')
        if node_value is not None:
            topic.default_message_time_to_live = node_value
            invalid_topic = False
        node_value = _get_first_child_node_value(desc, 'MaxSizeInMegabytes')
        if node_value is not None:
            topic.max_size_in_megabytes = int(node_value)
            invalid_topic = False
        node_value = _get_first_child_node_value(
            desc, 'RequiresDuplicateDetection')
        if node_value is not None:
            topic.requires_duplicate_detection = _parse_bool(node_value)
            invalid_topic = False
        node_value = _get_first_child_node_value(
            desc, 'DuplicateDetectionHistoryTimeWindow')
        if node_value is not None:
            topic.duplicate_detection_history_time_window = node_value
            invalid_topic = False
        node_value = _get_first_child_node_value(
            desc, 'EnableBatchedOperations')
        if node_value is not None:
            topic.enable_batched_operations = _parse_bool(node_value)
            invalid_topic = False
        node_value = _get_first_child_node_value(desc, 'SizeInBytes')
        if node_value is not None:
            topic.size_in_bytes = int(node_value)
            invalid_topic = False

    if invalid_topic:
        raise WindowsAzureError(_ERROR_TOPIC_NOT_FOUND)

    # extract id, updated and name value from feed entry and set them of topic.
    for name, value in _get_entry_properties(xmlstr, True).items():
        setattr(topic, name, value)
    return topic


def _convert_response_to_subscription(response):
    return _convert_xml_to_subscription(response.body)


def _convert_xml_to_subscription(xmlstr):
    '''Converts xml response to subscription

    The xml format for subscription:
<entry xmlns='http://www.w3.org/2005/Atom'>
    <content type='application/xml'>
    <SubscriptionDescription
        xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
        xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
        <LockDuration>PT5M</LockDuration>
        <RequiresSession>false</RequiresSession>
        <DefaultMessageTimeToLive>P10675199DT2H48M5.4775807S</DefaultMessageTimeToLive>
        <DeadLetteringOnMessageExpiration>false</DeadLetteringOnMessageExpiration>
        <DeadLetteringOnFilterEvaluationExceptions>true</DeadLetteringOnFilterEvaluationExceptions>
    </SubscriptionDescription>
    </content>
</entry>
    '''
    xmldoc = minidom.parseString(xmlstr)
    subscription = Subscription()

    for desc in _get_children_from_path(xmldoc,
                                        'entry',
                                        'content',
                                        'SubscriptionDescription'):
        node_value = _get_first_child_node_value(desc, 'LockDuration')
        if node_value is not None:
            subscription.lock_duration = node_value

        node_value = _get_first_child_node_value(
            desc, 'RequiresSession')
        if node_value is not None:
            subscription.requires_session = _parse_bool(node_value)

        node_value = _get_first_child_node_value(
            desc, 'DefaultMessageTimeToLive')
        if node_value is not None:
            subscription.default_message_time_to_live = node_value

        node_value = _get_first_child_node_value(
            desc, 'DeadLetteringOnFilterEvaluationExceptions')
        if node_value is not None:
            subscription.dead_lettering_on_filter_evaluation_exceptions = \
                _parse_bool(node_value)

        node_value = _get_first_child_node_value(
            desc, 'DeadLetteringOnMessageExpiration')
        if node_value is not None:
            subscription.dead_lettering_on_message_expiration = \
                _parse_bool(node_value)

        node_value = _get_first_child_node_value(
            desc, 'EnableBatchedOperations')
        if node_value is not None:
            subscription.enable_batched_operations = _parse_bool(node_value)

        node_value = _get_first_child_node_value(
            desc, 'MaxDeliveryCount')
        if node_value is not None:
            subscription.max_delivery_count = int(node_value)

        node_value = _get_first_child_node_value(
            desc, 'MessageCount')
        if node_value is not None:
            subscription.message_count = int(node_value)

    for name, value in _get_entry_properties(xmlstr,
                                             True,
                                             '/subscriptions').items():
        setattr(subscription, name, value)

    return subscription


def _convert_subscription_to_xml(subscription):
    '''
    Converts a subscription object to xml to send.  The order of each field of
    subscription in xml is very important so we can't simple call
    convert_class_to_xml.

    subscription: the subsciption object to be converted.
    '''

    subscription_body = '<SubscriptionDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
    if subscription:
        if subscription.lock_duration is not None:
            subscription_body += ''.join(
                ['<LockDuration>',
                 str(subscription.lock_duration),
                 '</LockDuration>'])

        if subscription.requires_session is not None:
            subscription_body += ''.join(
                ['<RequiresSession>',
                 str(subscription.requires_session).lower(),
                 '</RequiresSession>'])

        if subscription.default_message_time_to_live is not None:
            subscription_body += ''.join(
                ['<DefaultMessageTimeToLive>',
                 str(subscription.default_message_time_to_live),
                 '</DefaultMessageTimeToLive>'])

        if subscription.dead_lettering_on_message_expiration is not None:
            subscription_body += ''.join(
                ['<DeadLetteringOnMessageExpiration>',
                 str(subscription.dead_lettering_on_message_expiration).lower(),
                 '</DeadLetteringOnMessageExpiration>'])

        if subscription.dead_lettering_on_filter_evaluation_exceptions is not None:
            subscription_body += ''.join(
                ['<DeadLetteringOnFilterEvaluationExceptions>',
                 str(subscription.dead_lettering_on_filter_evaluation_exceptions).lower(),
                 '</DeadLetteringOnFilterEvaluationExceptions>'])

        if subscription.enable_batched_operations is not None:
            subscription_body += ''.join(
                ['<EnableBatchedOperations>',
                 str(subscription.enable_batched_operations).lower(),
                 '</EnableBatchedOperations>'])

        if subscription.max_delivery_count is not None:
            subscription_body += ''.join(
                ['<MaxDeliveryCount>',
                 str(subscription.max_delivery_count),
                 '</MaxDeliveryCount>'])

        if subscription.message_count is not None:
            subscription_body += ''.join(
                ['<MessageCount>',
                 str(subscription.message_count),
                 '</MessageCount>'])

    subscription_body += '</SubscriptionDescription>'
    return _create_entry(subscription_body)


def _convert_rule_to_xml(rule):
    '''
    Converts a rule object to xml to send.  The order of each field of rule
    in xml is very important so we cann't simple call convert_class_to_xml.

    rule: the rule object to be converted.
    '''
    rule_body = '<RuleDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
    if rule:
        if rule.filter_type:
            rule_body += ''.join(
                ['<Filter i:type="',
                 xml_escape(rule.filter_type),
                 '">'])
            if rule.filter_type == 'CorrelationFilter':
                rule_body += ''.join(
                    ['<CorrelationId>',
                     xml_escape(rule.filter_expression),
                     '</CorrelationId>'])
            else:
                rule_body += ''.join(
                    ['<SqlExpression>',
                     xml_escape(rule.filter_expression),
                     '</SqlExpression>'])
                rule_body += '<CompatibilityLevel>20</CompatibilityLevel>'
            rule_body += '</Filter>'
        if rule.action_type:
            rule_body += ''.join(
                ['<Action i:type="',
                 xml_escape(rule.action_type),
                 '">'])
            if rule.action_type == 'SqlRuleAction':
                rule_body += ''.join(
                    ['<SqlExpression>',
                     xml_escape(rule.action_expression),
                     '</SqlExpression>'])
                rule_body += '<CompatibilityLevel>20</CompatibilityLevel>'
            rule_body += '</Action>'
    rule_body += '</RuleDescription>'

    return _create_entry(rule_body)


def _convert_topic_to_xml(topic):
    '''
    Converts a topic object to xml to send.  The order of each field of topic
    in xml is very important so we cann't simple call convert_class_to_xml.

    topic: the topic object to be converted.
    '''

    topic_body = '<TopicDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
    if topic:
        if topic.default_message_time_to_live is not None:
            topic_body += ''.join(
                ['<DefaultMessageTimeToLive>',
                 str(topic.default_message_time_to_live),
                 '</DefaultMessageTimeToLive>'])

        if topic.max_size_in_megabytes is not None:
            topic_body += ''.join(
                ['<MaxSizeInMegabytes>',
                 str(topic.max_size_in_megabytes),
                 '</MaxSizeInMegabytes>'])

        if topic.requires_duplicate_detection is not None:
            topic_body += ''.join(
                ['<RequiresDuplicateDetection>',
                 str(topic.requires_duplicate_detection).lower(),
                 '</RequiresDuplicateDetection>'])

        if topic.duplicate_detection_history_time_window is not None:
            topic_body += ''.join(
                ['<DuplicateDetectionHistoryTimeWindow>',
                 str(topic.duplicate_detection_history_time_window),
                 '</DuplicateDetectionHistoryTimeWindow>'])

        if topic.enable_batched_operations is not None:
            topic_body += ''.join(
                ['<EnableBatchedOperations>',
                 str(topic.enable_batched_operations).lower(),
                 '</EnableBatchedOperations>'])

        if topic.size_in_bytes is not None:
            topic_body += ''.join(
                ['<SizeInBytes>',
                 str(topic.size_in_bytes),
                 '</SizeInBytes>'])

    topic_body += '</TopicDescription>'

    return _create_entry(topic_body)


def _convert_queue_to_xml(queue):
    '''
    Converts a queue object to xml to send.  The order of each field of queue
    in xml is very important so we cann't simple call convert_class_to_xml.

    queue: the queue object to be converted.
    '''
    queue_body = '<QueueDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
    if queue:
        if queue.lock_duration:
            queue_body += ''.join(
                ['<LockDuration>',
                 str(queue.lock_duration),
                 '</LockDuration>'])

        if queue.max_size_in_megabytes is not None:
            queue_body += ''.join(
                ['<MaxSizeInMegabytes>',
                 str(queue.max_size_in_megabytes),
                 '</MaxSizeInMegabytes>'])

        if queue.requires_duplicate_detection is not None:
            queue_body += ''.join(
                ['<RequiresDuplicateDetection>',
                 str(queue.requires_duplicate_detection).lower(),
                 '</RequiresDuplicateDetection>'])

        if queue.requires_session is not None:
            queue_body += ''.join(
                ['<RequiresSession>',
                 str(queue.requires_session).lower(),
                 '</RequiresSession>'])

        if queue.default_message_time_to_live is not None:
            queue_body += ''.join(
                ['<DefaultMessageTimeToLive>',
                 str(queue.default_message_time_to_live),
                 '</DefaultMessageTimeToLive>'])

        if queue.dead_lettering_on_message_expiration is not None:
            queue_body += ''.join(
                ['<DeadLetteringOnMessageExpiration>',
                 str(queue.dead_lettering_on_message_expiration).lower(),
                 '</DeadLetteringOnMessageExpiration>'])

        if queue.duplicate_detection_history_time_window is not None:
            queue_body += ''.join(
                ['<DuplicateDetectionHistoryTimeWindow>',
                 str(queue.duplicate_detection_history_time_window),
                 '</DuplicateDetectionHistoryTimeWindow>'])

        if queue.max_delivery_count is not None:
            queue_body += ''.join(
                ['<MaxDeliveryCount>',
                 str(queue.max_delivery_count),
                 '</MaxDeliveryCount>'])

        if queue.enable_batched_operations is not None:
            queue_body += ''.join(
                ['<EnableBatchedOperations>',
                 str(queue.enable_batched_operations).lower(),
                 '</EnableBatchedOperations>'])

        if queue.size_in_bytes is not None:
            queue_body += ''.join(
                ['<SizeInBytes>',
                 str(queue.size_in_bytes),
                 '</SizeInBytes>'])

        if queue.message_count is not None:
            queue_body += ''.join(
                ['<MessageCount>',
                 str(queue.message_count),
                 '</MessageCount>'])

    queue_body += '</QueueDescription>'
    return _create_entry(queue_body)


def _service_bus_error_handler(http_error):
    ''' Simple error handler for service bus service. '''
    return _general_error_handler(http_error)
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



