CustomScript/azure/servicebus/__init__.py (675 lines of code) (raw):
#-------------------------------------------------------------------------
# Copyright (c) Microsoft. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#--------------------------------------------------------------------------
import ast
import json
import sys
from datetime import datetime
from xml.dom import minidom
from azure import (
WindowsAzureData,
WindowsAzureError,
xml_escape,
_create_entry,
_general_error_handler,
_get_entry_properties,
_get_child_nodes,
_get_children_from_path,
_get_first_child_node_value,
_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE,
_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK,
_ERROR_QUEUE_NOT_FOUND,
_ERROR_TOPIC_NOT_FOUND,
)
from azure.http import HTTPError
# default rule name for subscription
DEFAULT_RULE_NAME = '$Default'
#-----------------------------------------------------------------------------
# Constants for Azure app environment settings.
AZURE_SERVICEBUS_NAMESPACE = 'AZURE_SERVICEBUS_NAMESPACE'
AZURE_SERVICEBUS_ACCESS_KEY = 'AZURE_SERVICEBUS_ACCESS_KEY'
AZURE_SERVICEBUS_ISSUER = 'AZURE_SERVICEBUS_ISSUER'
# namespace used for converting rules to objects
XML_SCHEMA_NAMESPACE = 'http://www.w3.org/2001/XMLSchema-instance'
class Queue(WindowsAzureData):
''' Queue class corresponding to Queue Description:
http://msdn.microsoft.com/en-us/library/windowsazure/hh780773'''
def __init__(self, lock_duration=None, max_size_in_megabytes=None,
requires_duplicate_detection=None, requires_session=None,
default_message_time_to_live=None,
dead_lettering_on_message_expiration=None,
duplicate_detection_history_time_window=None,
max_delivery_count=None, enable_batched_operations=None,
size_in_bytes=None, message_count=None):
self.lock_duration = lock_duration
self.max_size_in_megabytes = max_size_in_megabytes
self.requires_duplicate_detection = requires_duplicate_detection
self.requires_session = requires_session
self.default_message_time_to_live = default_message_time_to_live
self.dead_lettering_on_message_expiration = \
dead_lettering_on_message_expiration
self.duplicate_detection_history_time_window = \
duplicate_detection_history_time_window
self.max_delivery_count = max_delivery_count
self.enable_batched_operations = enable_batched_operations
self.size_in_bytes = size_in_bytes
self.message_count = message_count
class Topic(WindowsAzureData):
''' Topic class corresponding to Topic Description:
http://msdn.microsoft.com/en-us/library/windowsazure/hh780749. '''
def __init__(self, default_message_time_to_live=None,
max_size_in_megabytes=None, requires_duplicate_detection=None,
duplicate_detection_history_time_window=None,
enable_batched_operations=None, size_in_bytes=None):
self.default_message_time_to_live = default_message_time_to_live
self.max_size_in_megabytes = max_size_in_megabytes
self.requires_duplicate_detection = requires_duplicate_detection
self.duplicate_detection_history_time_window = \
duplicate_detection_history_time_window
self.enable_batched_operations = enable_batched_operations
self.size_in_bytes = size_in_bytes
@property
def max_size_in_mega_bytes(self):
import warnings
warnings.warn(
'This attribute has been changed to max_size_in_megabytes.')
return self.max_size_in_megabytes
@max_size_in_mega_bytes.setter
def max_size_in_mega_bytes(self, value):
self.max_size_in_megabytes = value
class Subscription(WindowsAzureData):
''' Subscription class corresponding to Subscription Description:
http://msdn.microsoft.com/en-us/library/windowsazure/hh780763. '''
def __init__(self, lock_duration=None, requires_session=None,
default_message_time_to_live=None,
dead_lettering_on_message_expiration=None,
dead_lettering_on_filter_evaluation_exceptions=None,
enable_batched_operations=None, max_delivery_count=None,
message_count=None):
self.lock_duration = lock_duration
self.requires_session = requires_session
self.default_message_time_to_live = default_message_time_to_live
self.dead_lettering_on_message_expiration = \
dead_lettering_on_message_expiration
self.dead_lettering_on_filter_evaluation_exceptions = \
dead_lettering_on_filter_evaluation_exceptions
self.enable_batched_operations = enable_batched_operations
self.max_delivery_count = max_delivery_count
self.message_count = message_count
class Rule(WindowsAzureData):
''' Rule class corresponding to Rule Description:
http://msdn.microsoft.com/en-us/library/windowsazure/hh780753. '''
def __init__(self, filter_type=None, filter_expression=None,
action_type=None, action_expression=None):
self.filter_type = filter_type
self.filter_expression = filter_expression
self.action_type = action_type
self.action_expression = action_type
class Message(WindowsAzureData):
''' Message class that used in send message/get mesage apis. '''
def __init__(self, body=None, service_bus_service=None, location=None,
custom_properties=None,
type='application/atom+xml;type=entry;charset=utf-8',
broker_properties=None):
self.body = body
self.location = location
self.broker_properties = broker_properties
self.custom_properties = custom_properties
self.type = type
self.service_bus_service = service_bus_service
self._topic_name = None
self._subscription_name = None
self._queue_name = None
if not service_bus_service:
return
# if location is set, then extracts the queue name for queue message and
# extracts the topic and subscriptions name if it is topic message.
if location:
if '/subscriptions/' in location:
pos = location.find('/subscriptions/')
pos1 = location.rfind('/', 0, pos - 1)
self._topic_name = location[pos1 + 1:pos]
pos += len('/subscriptions/')
pos1 = location.find('/', pos)
self._subscription_name = location[pos:pos1]
elif '/messages/' in location:
pos = location.find('/messages/')
pos1 = location.rfind('/', 0, pos - 1)
self._queue_name = location[pos1 + 1:pos]
def delete(self):
''' Deletes itself if find queue name or topic name and subscription
name. '''
if self._queue_name:
self.service_bus_service.delete_queue_message(
self._queue_name,
self.broker_properties['SequenceNumber'],
self.broker_properties['LockToken'])
elif self._topic_name and self._subscription_name:
self.service_bus_service.delete_subscription_message(
self._topic_name,
self._subscription_name,
self.broker_properties['SequenceNumber'],
self.broker_properties['LockToken'])
else:
raise WindowsAzureError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_DELETE)
def unlock(self):
''' Unlocks itself if find queue name or topic name and subscription
name. '''
if self._queue_name:
self.service_bus_service.unlock_queue_message(
self._queue_name,
self.broker_properties['SequenceNumber'],
self.broker_properties['LockToken'])
elif self._topic_name and self._subscription_name:
self.service_bus_service.unlock_subscription_message(
self._topic_name,
self._subscription_name,
self.broker_properties['SequenceNumber'],
self.broker_properties['LockToken'])
else:
raise WindowsAzureError(_ERROR_MESSAGE_NOT_PEEK_LOCKED_ON_UNLOCK)
def add_headers(self, request):
''' add addtional headers to request for message request.'''
# Adds custom properties
if self.custom_properties:
for name, value in self.custom_properties.items():
if sys.version_info < (3,) and isinstance(value, unicode):
request.headers.append(
(name, '"' + value.encode('utf-8') + '"'))
elif isinstance(value, str):
request.headers.append((name, '"' + str(value) + '"'))
elif isinstance(value, datetime):
request.headers.append(
(name, '"' + value.strftime('%a, %d %b %Y %H:%M:%S GMT') + '"'))
else:
request.headers.append((name, str(value).lower()))
# Adds content-type
request.headers.append(('Content-Type', self.type))
# Adds BrokerProperties
if self.broker_properties:
request.headers.append(
('BrokerProperties', str(self.broker_properties)))
return request.headers
def _create_message(response, service_instance):
''' Create message from response.
response: response from service bus cloud server.
service_instance: the service bus client.
'''
respbody = response.body
custom_properties = {}
broker_properties = None
message_type = None
message_location = None
# gets all information from respheaders.
for name, value in response.headers:
if name.lower() == 'brokerproperties':
broker_properties = json.loads(value)
elif name.lower() == 'content-type':
message_type = value
elif name.lower() == 'location':
message_location = value
elif name.lower() not in ['content-type',
'brokerproperties',
'transfer-encoding',
'server',
'location',
'date']:
if '"' in value:
value = value[1:-1]
try:
custom_properties[name] = datetime.strptime(
value, '%a, %d %b %Y %H:%M:%S GMT')
except ValueError:
custom_properties[name] = value
else: # only int, float or boolean
if value.lower() == 'true':
custom_properties[name] = True
elif value.lower() == 'false':
custom_properties[name] = False
# int('3.1') doesn't work so need to get float('3.14') first
elif str(int(float(value))) == value:
custom_properties[name] = int(value)
else:
custom_properties[name] = float(value)
if message_type == None:
message = Message(
respbody, service_instance, message_location, custom_properties,
'application/atom+xml;type=entry;charset=utf-8', broker_properties)
else:
message = Message(respbody, service_instance, message_location,
custom_properties, message_type, broker_properties)
return message
# convert functions
def _convert_response_to_rule(response):
return _convert_xml_to_rule(response.body)
def _convert_xml_to_rule(xmlstr):
''' Converts response xml to rule object.
The format of xml for rule:
<entry xmlns='http://www.w3.org/2005/Atom'>
<content type='application/xml'>
<RuleDescription
xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
<Filter i:type="SqlFilterExpression">
<SqlExpression>MyProperty='XYZ'</SqlExpression>
</Filter>
<Action i:type="SqlFilterAction">
<SqlExpression>set MyProperty2 = 'ABC'</SqlExpression>
</Action>
</RuleDescription>
</content>
</entry>
'''
xmldoc = minidom.parseString(xmlstr)
rule = Rule()
for rule_desc in _get_children_from_path(xmldoc,
'entry',
'content',
'RuleDescription'):
for xml_filter in _get_child_nodes(rule_desc, 'Filter'):
filter_type = xml_filter.getAttributeNS(
XML_SCHEMA_NAMESPACE, 'type')
setattr(rule, 'filter_type', str(filter_type))
if xml_filter.childNodes:
for expr in _get_child_nodes(xml_filter, 'SqlExpression'):
setattr(rule, 'filter_expression',
expr.firstChild.nodeValue)
for xml_action in _get_child_nodes(rule_desc, 'Action'):
action_type = xml_action.getAttributeNS(
XML_SCHEMA_NAMESPACE, 'type')
setattr(rule, 'action_type', str(action_type))
if xml_action.childNodes:
action_expression = xml_action.childNodes[0].firstChild
if action_expression:
setattr(rule, 'action_expression',
action_expression.nodeValue)
# extract id, updated and name value from feed entry and set them of rule.
for name, value in _get_entry_properties(xmlstr, True, '/rules').items():
setattr(rule, name, value)
return rule
def _convert_response_to_queue(response):
return _convert_xml_to_queue(response.body)
def _parse_bool(value):
if value.lower() == 'true':
return True
return False
def _convert_xml_to_queue(xmlstr):
''' Converts xml response to queue object.
The format of xml response for queue:
<QueueDescription
xmlns=\"http://schemas.microsoft.com/netservices/2010/10/servicebus/connect\">
<MaxSizeInBytes>10000</MaxSizeInBytes>
<DefaultMessageTimeToLive>PT5M</DefaultMessageTimeToLive>
<LockDuration>PT2M</LockDuration>
<RequiresGroupedReceives>False</RequiresGroupedReceives>
<SupportsDuplicateDetection>False</SupportsDuplicateDetection>
...
</QueueDescription>
'''
xmldoc = minidom.parseString(xmlstr)
queue = Queue()
invalid_queue = True
# get node for each attribute in Queue class, if nothing found then the
# response is not valid xml for Queue.
for desc in _get_children_from_path(xmldoc,
'entry',
'content',
'QueueDescription'):
node_value = _get_first_child_node_value(desc, 'LockDuration')
if node_value is not None:
queue.lock_duration = node_value
invalid_queue = False
node_value = _get_first_child_node_value(desc, 'MaxSizeInMegabytes')
if node_value is not None:
queue.max_size_in_megabytes = int(node_value)
invalid_queue = False
node_value = _get_first_child_node_value(
desc, 'RequiresDuplicateDetection')
if node_value is not None:
queue.requires_duplicate_detection = _parse_bool(node_value)
invalid_queue = False
node_value = _get_first_child_node_value(desc, 'RequiresSession')
if node_value is not None:
queue.requires_session = _parse_bool(node_value)
invalid_queue = False
node_value = _get_first_child_node_value(
desc, 'DefaultMessageTimeToLive')
if node_value is not None:
queue.default_message_time_to_live = node_value
invalid_queue = False
node_value = _get_first_child_node_value(
desc, 'DeadLetteringOnMessageExpiration')
if node_value is not None:
queue.dead_lettering_on_message_expiration = _parse_bool(node_value)
invalid_queue = False
node_value = _get_first_child_node_value(
desc, 'DuplicateDetectionHistoryTimeWindow')
if node_value is not None:
queue.duplicate_detection_history_time_window = node_value
invalid_queue = False
node_value = _get_first_child_node_value(
desc, 'EnableBatchedOperations')
if node_value is not None:
queue.enable_batched_operations = _parse_bool(node_value)
invalid_queue = False
node_value = _get_first_child_node_value(desc, 'MaxDeliveryCount')
if node_value is not None:
queue.max_delivery_count = int(node_value)
invalid_queue = False
node_value = _get_first_child_node_value(desc, 'MessageCount')
if node_value is not None:
queue.message_count = int(node_value)
invalid_queue = False
node_value = _get_first_child_node_value(desc, 'SizeInBytes')
if node_value is not None:
queue.size_in_bytes = int(node_value)
invalid_queue = False
if invalid_queue:
raise WindowsAzureError(_ERROR_QUEUE_NOT_FOUND)
# extract id, updated and name value from feed entry and set them of queue.
for name, value in _get_entry_properties(xmlstr, True).items():
setattr(queue, name, value)
return queue
def _convert_response_to_topic(response):
return _convert_xml_to_topic(response.body)
def _convert_xml_to_topic(xmlstr):
'''Converts xml response to topic
The xml format for topic:
<entry xmlns='http://www.w3.org/2005/Atom'>
<content type='application/xml'>
<TopicDescription
xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
<DefaultMessageTimeToLive>P10675199DT2H48M5.4775807S</DefaultMessageTimeToLive>
<MaxSizeInMegabytes>1024</MaxSizeInMegabytes>
<RequiresDuplicateDetection>false</RequiresDuplicateDetection>
<DuplicateDetectionHistoryTimeWindow>P7D</DuplicateDetectionHistoryTimeWindow>
<DeadLetteringOnFilterEvaluationExceptions>true</DeadLetteringOnFilterEvaluationExceptions>
</TopicDescription>
</content>
</entry>
'''
xmldoc = minidom.parseString(xmlstr)
topic = Topic()
invalid_topic = True
# get node for each attribute in Topic class, if nothing found then the
# response is not valid xml for Topic.
for desc in _get_children_from_path(xmldoc,
'entry',
'content',
'TopicDescription'):
invalid_topic = True
node_value = _get_first_child_node_value(
desc, 'DefaultMessageTimeToLive')
if node_value is not None:
topic.default_message_time_to_live = node_value
invalid_topic = False
node_value = _get_first_child_node_value(desc, 'MaxSizeInMegabytes')
if node_value is not None:
topic.max_size_in_megabytes = int(node_value)
invalid_topic = False
node_value = _get_first_child_node_value(
desc, 'RequiresDuplicateDetection')
if node_value is not None:
topic.requires_duplicate_detection = _parse_bool(node_value)
invalid_topic = False
node_value = _get_first_child_node_value(
desc, 'DuplicateDetectionHistoryTimeWindow')
if node_value is not None:
topic.duplicate_detection_history_time_window = node_value
invalid_topic = False
node_value = _get_first_child_node_value(
desc, 'EnableBatchedOperations')
if node_value is not None:
topic.enable_batched_operations = _parse_bool(node_value)
invalid_topic = False
node_value = _get_first_child_node_value(desc, 'SizeInBytes')
if node_value is not None:
topic.size_in_bytes = int(node_value)
invalid_topic = False
if invalid_topic:
raise WindowsAzureError(_ERROR_TOPIC_NOT_FOUND)
# extract id, updated and name value from feed entry and set them of topic.
for name, value in _get_entry_properties(xmlstr, True).items():
setattr(topic, name, value)
return topic
def _convert_response_to_subscription(response):
return _convert_xml_to_subscription(response.body)
def _convert_xml_to_subscription(xmlstr):
'''Converts xml response to subscription
The xml format for subscription:
<entry xmlns='http://www.w3.org/2005/Atom'>
<content type='application/xml'>
<SubscriptionDescription
xmlns:i="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">
<LockDuration>PT5M</LockDuration>
<RequiresSession>false</RequiresSession>
<DefaultMessageTimeToLive>P10675199DT2H48M5.4775807S</DefaultMessageTimeToLive>
<DeadLetteringOnMessageExpiration>false</DeadLetteringOnMessageExpiration>
<DeadLetteringOnFilterEvaluationExceptions>true</DeadLetteringOnFilterEvaluationExceptions>
</SubscriptionDescription>
</content>
</entry>
'''
xmldoc = minidom.parseString(xmlstr)
subscription = Subscription()
for desc in _get_children_from_path(xmldoc,
'entry',
'content',
'SubscriptionDescription'):
node_value = _get_first_child_node_value(desc, 'LockDuration')
if node_value is not None:
subscription.lock_duration = node_value
node_value = _get_first_child_node_value(
desc, 'RequiresSession')
if node_value is not None:
subscription.requires_session = _parse_bool(node_value)
node_value = _get_first_child_node_value(
desc, 'DefaultMessageTimeToLive')
if node_value is not None:
subscription.default_message_time_to_live = node_value
node_value = _get_first_child_node_value(
desc, 'DeadLetteringOnFilterEvaluationExceptions')
if node_value is not None:
subscription.dead_lettering_on_filter_evaluation_exceptions = \
_parse_bool(node_value)
node_value = _get_first_child_node_value(
desc, 'DeadLetteringOnMessageExpiration')
if node_value is not None:
subscription.dead_lettering_on_message_expiration = \
_parse_bool(node_value)
node_value = _get_first_child_node_value(
desc, 'EnableBatchedOperations')
if node_value is not None:
subscription.enable_batched_operations = _parse_bool(node_value)
node_value = _get_first_child_node_value(
desc, 'MaxDeliveryCount')
if node_value is not None:
subscription.max_delivery_count = int(node_value)
node_value = _get_first_child_node_value(
desc, 'MessageCount')
if node_value is not None:
subscription.message_count = int(node_value)
for name, value in _get_entry_properties(xmlstr,
True,
'/subscriptions').items():
setattr(subscription, name, value)
return subscription
def _convert_subscription_to_xml(subscription):
'''
Converts a subscription object to xml to send. The order of each field of
subscription in xml is very important so we can't simple call
convert_class_to_xml.
subscription: the subsciption object to be converted.
'''
subscription_body = '<SubscriptionDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
if subscription:
if subscription.lock_duration is not None:
subscription_body += ''.join(
['<LockDuration>',
str(subscription.lock_duration),
'</LockDuration>'])
if subscription.requires_session is not None:
subscription_body += ''.join(
['<RequiresSession>',
str(subscription.requires_session).lower(),
'</RequiresSession>'])
if subscription.default_message_time_to_live is not None:
subscription_body += ''.join(
['<DefaultMessageTimeToLive>',
str(subscription.default_message_time_to_live),
'</DefaultMessageTimeToLive>'])
if subscription.dead_lettering_on_message_expiration is not None:
subscription_body += ''.join(
['<DeadLetteringOnMessageExpiration>',
str(subscription.dead_lettering_on_message_expiration).lower(),
'</DeadLetteringOnMessageExpiration>'])
if subscription.dead_lettering_on_filter_evaluation_exceptions is not None:
subscription_body += ''.join(
['<DeadLetteringOnFilterEvaluationExceptions>',
str(subscription.dead_lettering_on_filter_evaluation_exceptions).lower(),
'</DeadLetteringOnFilterEvaluationExceptions>'])
if subscription.enable_batched_operations is not None:
subscription_body += ''.join(
['<EnableBatchedOperations>',
str(subscription.enable_batched_operations).lower(),
'</EnableBatchedOperations>'])
if subscription.max_delivery_count is not None:
subscription_body += ''.join(
['<MaxDeliveryCount>',
str(subscription.max_delivery_count),
'</MaxDeliveryCount>'])
if subscription.message_count is not None:
subscription_body += ''.join(
['<MessageCount>',
str(subscription.message_count),
'</MessageCount>'])
subscription_body += '</SubscriptionDescription>'
return _create_entry(subscription_body)
def _convert_rule_to_xml(rule):
'''
Converts a rule object to xml to send. The order of each field of rule
in xml is very important so we cann't simple call convert_class_to_xml.
rule: the rule object to be converted.
'''
rule_body = '<RuleDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
if rule:
if rule.filter_type:
rule_body += ''.join(
['<Filter i:type="',
xml_escape(rule.filter_type),
'">'])
if rule.filter_type == 'CorrelationFilter':
rule_body += ''.join(
['<CorrelationId>',
xml_escape(rule.filter_expression),
'</CorrelationId>'])
else:
rule_body += ''.join(
['<SqlExpression>',
xml_escape(rule.filter_expression),
'</SqlExpression>'])
rule_body += '<CompatibilityLevel>20</CompatibilityLevel>'
rule_body += '</Filter>'
if rule.action_type:
rule_body += ''.join(
['<Action i:type="',
xml_escape(rule.action_type),
'">'])
if rule.action_type == 'SqlRuleAction':
rule_body += ''.join(
['<SqlExpression>',
xml_escape(rule.action_expression),
'</SqlExpression>'])
rule_body += '<CompatibilityLevel>20</CompatibilityLevel>'
rule_body += '</Action>'
rule_body += '</RuleDescription>'
return _create_entry(rule_body)
def _convert_topic_to_xml(topic):
'''
Converts a topic object to xml to send. The order of each field of topic
in xml is very important so we cann't simple call convert_class_to_xml.
topic: the topic object to be converted.
'''
topic_body = '<TopicDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
if topic:
if topic.default_message_time_to_live is not None:
topic_body += ''.join(
['<DefaultMessageTimeToLive>',
str(topic.default_message_time_to_live),
'</DefaultMessageTimeToLive>'])
if topic.max_size_in_megabytes is not None:
topic_body += ''.join(
['<MaxSizeInMegabytes>',
str(topic.max_size_in_megabytes),
'</MaxSizeInMegabytes>'])
if topic.requires_duplicate_detection is not None:
topic_body += ''.join(
['<RequiresDuplicateDetection>',
str(topic.requires_duplicate_detection).lower(),
'</RequiresDuplicateDetection>'])
if topic.duplicate_detection_history_time_window is not None:
topic_body += ''.join(
['<DuplicateDetectionHistoryTimeWindow>',
str(topic.duplicate_detection_history_time_window),
'</DuplicateDetectionHistoryTimeWindow>'])
if topic.enable_batched_operations is not None:
topic_body += ''.join(
['<EnableBatchedOperations>',
str(topic.enable_batched_operations).lower(),
'</EnableBatchedOperations>'])
if topic.size_in_bytes is not None:
topic_body += ''.join(
['<SizeInBytes>',
str(topic.size_in_bytes),
'</SizeInBytes>'])
topic_body += '</TopicDescription>'
return _create_entry(topic_body)
def _convert_queue_to_xml(queue):
'''
Converts a queue object to xml to send. The order of each field of queue
in xml is very important so we cann't simple call convert_class_to_xml.
queue: the queue object to be converted.
'''
queue_body = '<QueueDescription xmlns:i="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://schemas.microsoft.com/netservices/2010/10/servicebus/connect">'
if queue:
if queue.lock_duration:
queue_body += ''.join(
['<LockDuration>',
str(queue.lock_duration),
'</LockDuration>'])
if queue.max_size_in_megabytes is not None:
queue_body += ''.join(
['<MaxSizeInMegabytes>',
str(queue.max_size_in_megabytes),
'</MaxSizeInMegabytes>'])
if queue.requires_duplicate_detection is not None:
queue_body += ''.join(
['<RequiresDuplicateDetection>',
str(queue.requires_duplicate_detection).lower(),
'</RequiresDuplicateDetection>'])
if queue.requires_session is not None:
queue_body += ''.join(
['<RequiresSession>',
str(queue.requires_session).lower(),
'</RequiresSession>'])
if queue.default_message_time_to_live is not None:
queue_body += ''.join(
['<DefaultMessageTimeToLive>',
str(queue.default_message_time_to_live),
'</DefaultMessageTimeToLive>'])
if queue.dead_lettering_on_message_expiration is not None:
queue_body += ''.join(
['<DeadLetteringOnMessageExpiration>',
str(queue.dead_lettering_on_message_expiration).lower(),
'</DeadLetteringOnMessageExpiration>'])
if queue.duplicate_detection_history_time_window is not None:
queue_body += ''.join(
['<DuplicateDetectionHistoryTimeWindow>',
str(queue.duplicate_detection_history_time_window),
'</DuplicateDetectionHistoryTimeWindow>'])
if queue.max_delivery_count is not None:
queue_body += ''.join(
['<MaxDeliveryCount>',
str(queue.max_delivery_count),
'</MaxDeliveryCount>'])
if queue.enable_batched_operations is not None:
queue_body += ''.join(
['<EnableBatchedOperations>',
str(queue.enable_batched_operations).lower(),
'</EnableBatchedOperations>'])
if queue.size_in_bytes is not None:
queue_body += ''.join(
['<SizeInBytes>',
str(queue.size_in_bytes),
'</SizeInBytes>'])
if queue.message_count is not None:
queue_body += ''.join(
['<MessageCount>',
str(queue.message_count),
'</MessageCount>'])
queue_body += '</QueueDescription>'
return _create_entry(queue_body)
def _service_bus_error_handler(http_error):
''' Simple error handler for service bus service. '''
return _general_error_handler(http_error)
from azure.servicebus.servicebusservice import ServiceBusService