uamqp/utils.py (83 lines of code) (raw):

#------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. #-------------------------------------------------------------------------- import base64 import calendar import time import uuid import logging from datetime import timedelta, datetime from uamqp import c_uamqp logger = logging.getLogger(__name__) def get_running_loop(): try: import asyncio # pylint: disable=import-error return asyncio.get_running_loop() except AttributeError: # 3.6 loop = None try: loop = asyncio._get_running_loop() # pylint: disable=protected-access except AttributeError: logger.warning('This version of Python is deprecated, please upgrade to >= v3.6') if loop is None: logger.warning('No running event loop') loop = asyncio.get_event_loop() return loop except RuntimeError: # For backwards compatibility, create new event loop logger.warning('No running event loop') return asyncio.get_event_loop() def parse_connection_string(connect_str): """Parse a connection string such as those provided by the Azure portal. Connection string should be formatted like: `Key=Value;Key=Value;Key=Value`. The connection string will be parsed into a dictionary. :param connect_str: The connection string. :type connect_str: str :rtype: dict[str, str] """ connect_info = {} fields = connect_str.split(';') for field in fields: key, value = field.split('=', 1) connect_info[key] = value return connect_info def create_sas_token(key_name, shared_access_key, scope, expiry=timedelta(hours=1)): """Create a SAS token. :param key_name: The username/key name/policy name for the token. :type key_name: bytes :param shared_access_key: The shared access key to generate the token from. :type shared_access_key: bytes :param scope: The token permissions scope. :type scope: bytes :param expiry: The lifetime of the generated token. Default is 1 hour. :type expiry: ~datetime.timedelta :rtype: bytes """ shared_access_key = base64.b64encode(shared_access_key) abs_expiry = int(time.time()) + expiry.seconds return c_uamqp.create_sas_token(shared_access_key, scope, key_name, abs_expiry) def _convert_py_number(value): """Convert a Python integer value into equivalent C object. Will attempt to use the smallest possible conversion, starting with int, then long then double. """ try: return c_uamqp.int_value(value) except OverflowError: pass try: return c_uamqp.long_value(value) except OverflowError: pass return c_uamqp.double_value(value) def data_factory(value, encoding='UTF-8'): """Wrap a Python type in the equivalent C AMQP type. If the Python type has already been wrapped in a ~uamqp.types.AMQPType object - then this will be used to select the appropriate C type. - bool => c_uamqp.BoolValue - int => c_uamqp.IntValue, LongValue, DoubleValue - str => c_uamqp.StringValue - bytes => c_uamqp.BinaryValue - list/set/tuple => c_uamqp.ListValue - dict => c_uamqp.DictValue (AMQP map) - float => c_uamqp.DoubleValue - uuid.UUID => c_uamqp.UUIDValue :param value: The value to wrap. :type value: ~uamqp.types.AMQPType :rtype: uamqp.c_uamqp.AMQPValue """ result = None if value is None: result = c_uamqp.null_value() elif hasattr(value, 'c_data'): result = value.c_data elif isinstance(value, c_uamqp.AMQPValue): result = value elif isinstance(value, bool): result = c_uamqp.bool_value(value) elif isinstance(value, str): result = c_uamqp.string_value(value.encode(encoding)) elif isinstance(value, bytes): result = c_uamqp.string_value(value) elif isinstance(value, uuid.UUID): result = c_uamqp.uuid_value(value) elif isinstance(value, bytearray): result = c_uamqp.binary_value(value) elif isinstance(value, int): result = _convert_py_number(value) elif isinstance(value, float): result = c_uamqp.double_value(value) elif isinstance(value, dict): wrapped_dict = c_uamqp.dict_value() for key, item in value.items(): wrapped_dict[data_factory(key, encoding=encoding)] = data_factory(item, encoding=encoding) result = wrapped_dict elif isinstance(value, (list, set, tuple)): wrapped_list = c_uamqp.list_value() wrapped_list.size = len(value) for index, item in enumerate(value): wrapped_list[index] = data_factory(item, encoding=encoding) result = wrapped_list elif isinstance(value, datetime): timestamp = int((calendar.timegm(value.utctimetuple()) * 1000) + (value.microsecond/1000)) result = c_uamqp.timestamp_value(timestamp) return result