def _write_data_pair()

in src/python/qpid_interop_test/amqp_complex_types_test_generator.py [0:0]


    def _write_data_pair(self, indent_level, data_pair, separator=',', eol=True, indent=True):
        """Write a JOSN pair ['amqp_type', value]"""
        indent_str = ' ' * (indent_level * INDENT_LEVEL_SIZE) if indent else ''
        post_indent_str = ' ' * (indent_level * INDENT_LEVEL_SIZE)
        eol_char = '\n' if eol else ''
        amqp_type, value = data_pair
        if amqp_type == 'null':
            self.target_file.write('%sNone%s%s' % (indent_str, separator, eol_char))
        elif amqp_type == 'boolean':
            self.target_file.write('%s%s%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'ubyte':
            self.target_file.write('%sproton.ubyte(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'byte':
            self.target_file.write('%sproton.byte(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'ushort':
            self.target_file.write('%sproton.ushort(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'short':
            self.target_file.write('%sproton.short(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'uint':
            self.target_file.write('%sproton.uint(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'int':
            self.target_file.write('%sproton.int32(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'ulong':
            self.target_file.write('%sproton.ulong(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'long':
            if isinstance(value, str):
                if (len(value) > 2 and value[:2] == '0x') or (len(value) > 3 and value[:3] == '-0x'):
                    self.target_file.write('%sint(u\'%s\', 16)%s%s' %
                                           (indent_str, value, separator, eol_char))
                else:
                    self.target_file.write('%sint(u\'%s\', 10)%s%s' %
                                           (indent_str, value, separator, eol_char))
            else:
                self.target_file.write('%sint(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'float':
            if isinstance(value, str) and (value[-3:] == 'inf' or value[-3:] == 'NaN'):
                self.target_file.write('%sproton.float32(u\'%s\')%s%s' % (indent_str, value, separator, eol_char))
            else:
                self.target_file.write('%sproton.float32(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'double':
            if isinstance(value, str) and (value[-3:] == 'inf' or value[-3:] == 'NaN'):
                self.target_file.write('%sfloat(u\'%s\')%s%s' % (indent_str, value, separator, eol_char))
            else:
                self.target_file.write('%sfloat(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'decimal32':
            self.target_file.write('%sproton.decimal32(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'decimal64':
            self.target_file.write('%sproton.decimal64(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'decimal128':
            byte_itr = iter(value[2:])
            self.target_file.write('%sproton.decimal128(b\'' % indent_str)
            for char_ in byte_itr:
                self.target_file.write('\\x%c%c' % (char_, next(byte_itr)))
            self.target_file.write('\')%s%s' % (separator, eol_char))
        elif amqp_type == 'char':
            self.target_file.write('%sproton.char(' % indent_str)
            if len(value) == 1: # single char
                self.target_file.write('u\'%s\'' % value)
            else:
                self.target_file.write('chr(int(u\'%s\', 16))' % value)
            self.target_file.write(')%s%s' % (separator, eol_char))
        elif amqp_type == 'timestamp':
            self.target_file.write('%sproton.timestamp(%s)%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'uuid':
            self.target_file.write('%suuid.UUID(' % indent_str)
            if value[:2] == '0x':
                self.target_file.write('int=%s' % value)
            else:
                self.target_file.write('u\'{%s}\'' % value)
            self.target_file.write(')%s%s' % (separator, eol_char))
        elif amqp_type == 'binary':
            if isinstance(value, int):
                value = hex(value)
            if isinstance(value, str):
                self.target_file.write('%sb\'' % indent_str)
                if value[:2] == '0x':
                    value = value[2:]
                    if len(value) % 2 > 0:
                        value = '0' + value
                    byte_itr = iter(value)
                    for char_ in byte_itr:
                        self.target_file.write('\\x%c%c' % (char_, next(byte_itr)))
                else:
                    for char_ in value:
                        if char_ == '\'':
                            self.target_file.write('\\')
                        self.target_file.write(char_)
                self.target_file.write('\'%s%s' % (separator, eol_char))
            else:
                self.target_file.write('%s%d%s%s' % (indent_str, value, separator, eol_char))
        elif amqp_type == 'string':
            self.target_file.write('%su\'' % indent_str)
            for char_ in value:
                if char_ == '\'':
                    self.target_file.write('\\')
                self.target_file.write(char_)
            self.target_file.write('\'%s%s' % (separator, eol_char))
        elif amqp_type == 'symbol':
            self.target_file.write('%sproton.symbol(u\'' % indent_str)
            for char_ in value:
                if char_ == '\'':
                    self.target_file.write('\\')
                self.target_file.write(char_)
            self.target_file.write('\')%s%s' % (separator, eol_char))
        elif amqp_type == 'array':
            if not isinstance(value, list):
                raise RuntimeError('AMQP array value not a list, found %s' % type(value))
            amqp_type = None
            if value:
                amqp_type = value[0][0]
            self.target_file.write('%sproton.Array(proton.UNDESCRIBED, %s' %
                                   (indent_str, PythonGenerator._proton_type_code(amqp_type)))
            if amqp_type:
                self.target_file.write(', \n')
            for value_data_pair in value:
                if value_data_pair[0] != amqp_type:
                    raise RuntimeError('AMQP array of type %s has element of type %s' %
                                       (amqp_type, value_data_pair[0]))
                self._write_data_pair(indent_level+1, value_data_pair)
            if amqp_type:
                self.target_file.write('%s),%s' % (post_indent_str, eol_char))
            else:
                self.target_file.write('),%s' % eol_char)
        elif amqp_type == 'list':
            if not isinstance(value, list):
                raise RuntimeError('AMQP list value not a list, found %s' % type(value))
            self.target_file.write('%s[\n' % indent_str)
            for value_data_pair in value:
                self._write_data_pair(indent_level+1, value_data_pair)
            self.target_file.write('%s],%s' % (post_indent_str, eol_char))
        elif amqp_type == 'map':
            if not isinstance(value, list):
                raise RuntimeError('AMQP map value not a list, found %s' % type(value))
            if len(value) % 2 != 0:
                raise RuntimeError('AMQP map value list not even, contains %d items' % len(value))
            self.target_file.write('%s{\n' % indent_str)
            # consume list in pairs (key, value)
            value_iter = iter(value)
            for value_data_pair in value_iter:
                self._write_data_pair(indent_level+1, value_data_pair, separator=': ', eol=False)
                value_data_pair = next(value_iter)
                self._write_data_pair(indent_level+1, value_data_pair, indent=False)
            self.target_file.write('%s},%s' % (post_indent_str, eol_char))
        else:
            raise RuntimeError('Unknown AMQP type \'%s\'' % amqp_type)