c/tools/codec-generator/generate.py (444 lines of code) (raw):

import argparse import itertools import json import os import re import sys from typing import Any, Dict, List, Set, Tuple, Union class ParseError(Exception): def __init__(self, error): super().__init__(error) indent_size = 4 class ASTNode: def __init__(self, function_suffix: str, types: List[str], consume_types: Union[List[str], None] = None): self.function_suffix = function_suffix self.types = types self.consume_types = consume_types self.count_args = len(self.types) self.count_consume_args = len(self.consume_types) if self.consume_types else len(self.types) @staticmethod def mk_indent(indent: int): return " "*indent*indent_size @staticmethod def mk_funcall(name: str, args: List[str]): return f'{name}({", ".join(args)})' def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: return [f'{self.mk_indent(indent)}{self.mk_funcall("emit_"+self.function_suffix, prefix + self.gen_args(first_arg))};'] def gen_params(self, first_arg: int) -> List[Tuple[str, str]]: return [(f'arg{i+first_arg}', self.types[i]) for i in range(len(self.types))] def gen_args(self, first_arg: int) -> List[str]: return [f'arg{i+first_arg}' for i in range(len(self.types))] def gen_consume_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: return [f'{self.mk_indent(indent)}{self.mk_funcall("consume_"+self.function_suffix, prefix + self.gen_consume_args(first_arg))};'] @staticmethod def add_pointer(type: str) -> str: return f'{type}*' def gen_consume_params(self, first_arg: int) -> List[Tuple[str, str]]: if self.consume_types: return [(f'arg{i + first_arg}', self.consume_types[i]) for i in range(len(self.consume_types))] else: return [(f'arg{i + first_arg}', self.add_pointer(self.types[i])) for i in range(len(self.types))] def gen_consume_args(self, first_arg: int) -> List[str]: if self.consume_types: return [f'arg{i+first_arg}' for i in range(len(self.consume_types))] else: return [f'arg{i+first_arg}' for i in range(len(self.types))] def __repr__(self) -> str: return f'<{self.function_suffix}: {self.types}>' class NullNode(ASTNode): def __init__(self, function_suffix: str): super().__init__(function_suffix, []) def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: return [f'{self.mk_indent(indent)}{self.mk_funcall("emit_"+self.function_suffix, prefix)};'] def gen_params(self, first_arg: int) -> List[Tuple[str, str]]: return [] class ListNode(ASTNode): def __init__(self, name: str, l: List[ASTNode], types: List[str]): self.list: List[ASTNode] = l super().__init__(name, types) self.count_args += sum([i.count_args for i in l]) self.count_consume_args += sum([i.count_consume_args for i in l]) def gen_params_list(self, first_arg: int) -> List[Tuple[str, str]]: params = [] arg = first_arg for n in self.list: r = n.gen_params(arg) arg += len(r) params.append(r) return [i for i in itertools.chain(*params)] def gen_params(self, first_arg: int) -> List[Tuple[str, str]]: args = super().gen_params(first_arg) return [ *args, *self.gen_params_list(first_arg + len(args)) ] def gen_consume_params_list(self, first_arg: int) -> List[Tuple[str, str]]: params = [] arg = first_arg for n in self.list: r = n.gen_consume_params(arg) arg += len(r) params.append(r) return [i for i in itertools.chain(*params)] def gen_consume_params(self, first_arg: int) -> List[Tuple[str, str]]: args = super().gen_consume_params(first_arg) return [ *args, *self.gen_consume_params_list(first_arg + len(args)) ] def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: lines = [] arg = first_arg for n in self.list: lines.append(n.gen_emit_code(prefix, arg, indent)) arg += n.count_args return [i for i in itertools.chain(*lines)] def gen_consume_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: lines = [] arg = first_arg for n in self.list: lines.append(n.gen_consume_code(prefix, arg, indent+1)) arg += n.count_consume_args return [ f'{self.mk_indent(indent)}{{', f'{self.mk_indent(indent+1)}pni_consumer_t subconsumer;', f'{self.mk_indent(indent+1)}uint32_t count;', f'{self.mk_indent(indent+1)}{self.mk_funcall("consume_list", prefix+["&subconsumer", "&count"])};', f'{self.mk_indent(indent+1)}pni_consumer_t consumer = subconsumer;', *[i for i in itertools.chain(*lines)], f'{self.mk_indent(indent+1)}{self.mk_funcall("consume_end_list", prefix)};', f'{self.mk_indent(indent)}}}' ] def __repr__(self) -> str: nl = "\n " return f'''<{self.function_suffix}: {self.types} {nl.join([repr(x) for x in self.list])} >''' class DescListNode(ListNode): def __init__(self, l: List[ASTNode]): super().__init__('described_list', l, ['uint64_t']) def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: args = self.gen_args(first_arg) return [ f'{self.mk_indent(indent)}emit_descriptor({", ".join(prefix+args)});', f'{self.mk_indent(indent)}for (bool small_encoding = true; ; small_encoding = false) {{', f'{self.mk_indent(indent+1)}pni_compound_context c = ' f'{self.mk_funcall("emit_list", prefix+["small_encoding", "true"])};', f'{self.mk_indent(indent+1)}pni_compound_context compound = c;', *super().gen_emit_code(prefix, first_arg+len(args), indent + 1), f'{self.mk_indent(indent+1)}{self.mk_funcall("emit_end_list", prefix+["small_encoding"])};', f'{self.mk_indent(indent+1)}if (encode_succeeded({", ".join(prefix)})) break;', f'{self.mk_indent(indent)}}}', ] class DescListIgnoreTypeNode(ListNode): def __init__(self, l: List[ASTNode]): super().__init__('described_unknown_list', l, []) def gen_consume_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: return [ f'{self.mk_indent(indent)}{{', f'{self.mk_indent(indent+1)}pni_consumer_t subconsumer;', f'{self.mk_indent(indent+1)}{self.mk_funcall("consume_described", prefix+["&subconsumer"])};', f'{self.mk_indent(indent+1)}pni_consumer_t consumer = subconsumer;', *super().gen_consume_code(prefix, first_arg, indent + 1), f'{self.mk_indent(indent)}}}', ] class ArrayNode(ListNode): def __init__(self, l: List[ASTNode]): self.list: List[ASTNode] = l super().__init__('array', l, ['pn_type_t']) def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: args = self.gen_args(first_arg) return [ f'{self.mk_indent(indent)}for (bool small_encoding = true; ; small_encoding = false) {{', f'{self.mk_indent(indent+1)}pni_compound_context c = ' f'{self.mk_funcall("emit_array", prefix+["small_encoding"]+args)};', f'{self.mk_indent(indent+1)}pni_compound_context compound = c;', *super().gen_emit_code(prefix, first_arg+len(args), indent + 1), f'{self.mk_indent(indent+1)}{self.mk_funcall("emit_end_array", prefix+["small_encoding"])};', f'{self.mk_indent(indent+1)}if (encode_succeeded({", ".join(prefix)})) break;', f'{self.mk_indent(indent)}}}', ] class OptionNode(ASTNode): def __init__(self, i: ASTNode): self.option: ASTNode = i super().__init__('option', ['bool']) self.count_args += i.count_args self.count_consume_args += i.count_consume_args def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: arg = f'arg{first_arg}' return [ f'{self.mk_indent(indent)}if ({arg}) {{', *self.option.gen_emit_code(prefix, first_arg+1, indent + 1), f'{self.mk_indent(indent)}}} else {{', *NullNode("null").gen_emit_code(prefix, 0, indent + 1), f'{self.mk_indent(indent)}}}' ] def gen_params(self, first_arg: int) -> List[Tuple[str, str]]: args = super().gen_params(first_arg) return [ *args, *self.option.gen_params(first_arg + len(args)) ] def gen_consume_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: arg = f'arg{first_arg}' return [ f'{self.mk_indent(indent)}*{arg} = {self.option.gen_consume_code(prefix, first_arg+1, 0)[0]};' ] def gen_consume_params(self, first_arg: int) -> List[Tuple[str, str]]: args = super().gen_consume_params(first_arg) return [ *args, *self.option.gen_consume_params(first_arg + len(args)) ] class EmptyNode(ASTNode): def __init__(self): super().__init__('empty_frame', []) def gen_emit_code(self, prefix: List[str], first_arg: int, indent: int) -> List[str]: return [] def gen_params(self, first_arg: int) -> List[Tuple[str, str]]: return [] def expect_char(format: str, char: str) -> Tuple[bool, str]: return format[0] == char, format[1:] def parse_list(format: str) -> Tuple[List[ASTNode], str]: rest = format list = [] while not rest[0] == ']': i, rest, = parse_item(rest) list.append(i) return list, rest def parse_item(format: str) -> Tuple[ASTNode, str]: if len(format) == 0: return EmptyNode(), '' if format.startswith('DL['): l, rest = parse_list(format[3:]) b, rest = expect_char(rest, ']') if not b: raise ParseError(format) return DescListNode(l), rest elif format.startswith('D.['): l, rest = parse_list(format[3:]) b, rest = expect_char(rest, ']') if not b: raise ParseError(format) return DescListIgnoreTypeNode(l), rest elif format.startswith('D?LR'): return ASTNode('described_maybe_type_raw', ['bool', 'uint64_t', 'pn_bytes_t'], consume_types=['bool*', 'uint64_t*', 'pn_bytes_t*']), format[4:] elif format.startswith('D?L?.'): return ASTNode('described_maybe_type_maybe_anything', ['bool', 'uint64_t', 'bool'], consume_types=['bool*', 'uint64_t*', 'bool*']), format[4:] elif format.startswith('DLC'): return ASTNode('described_type_copy', ['uint64_t', 'pn_data_t*'], consume_types=['uint64_t*', 'pn_data_t*']), format[3:] elif format.startswith('DL.'): return ASTNode('described_type_anything', ['uint64_t']), format[3:] elif format.startswith('D?L.'): return ASTNode('described_maybe_type_anything', ['bool', 'uint64_t']), format[3:] elif format.startswith('D..'): return NullNode('described_anything'), format[3:] elif format.startswith('D.C'): return ASTNode('described_copy', ['pn_data_t*'], consume_types=['pn_data_t*']), format[3:] elif format.startswith('@T['): l, rest = parse_list(format[3:]) b, rest = expect_char(rest, ']') if not b: raise ParseError(format) return ArrayNode(l), rest elif format.startswith('?'): i, rest = parse_item(format[1:]) return OptionNode(i), rest elif format.startswith('*s'): return ASTNode('counted_symbols', ['size_t', 'char**']), format[2:] elif format.startswith('.'): return NullNode('anything'), format[1:] elif format.startswith('s'): return ASTNode('symbol', ['pn_bytes_t'], consume_types=['pn_bytes_t*']), format[1:] elif format.startswith('S'): return ASTNode('string', ['pn_bytes_t'], consume_types=['pn_bytes_t*']), format[1:] elif format.startswith('C'): return ASTNode('copy', ['pn_data_t*'], consume_types=['pn_data_t*']), format[1:] elif format.startswith('I'): return ASTNode('uint', ['uint32_t']), format[1:] elif format.startswith('H'): return ASTNode('ushort', ['uint16_t']), format[1:] elif format.startswith('n'): return NullNode('null'), format[1:] elif format.startswith('R'): return ASTNode('raw', ['pn_bytes_t'], consume_types=['pn_bytes_t*']), format[1:] elif format.startswith('a'): return ASTNode('atom', ['pn_atom_t*'], consume_types=['pn_atom_t*']), format[1:] elif format.startswith('M'): return ASTNode('multiple', ['pn_data_t*']), format[1:] elif format.startswith('o'): return ASTNode('bool', ['bool']), format[1:] elif format.startswith('B'): return ASTNode('ubyte', ['uint8_t']), format[1:] elif format.startswith('L'): return ASTNode('ulong', ['uint64_t']), format[1:] elif format.startswith('t'): return ASTNode('timestamp', ['pn_timestamp_t']), format[1:] elif format.startswith('z'): return ASTNode('binaryornull', ['size_t', 'const char*'], consume_types=['pn_bytes_t*']), format[1:] elif format.startswith('Z'): return ASTNode('binarynonull', ['size_t', 'const char*'], consume_types=['pn_bytes_t*']), format[1:] else: raise ParseError(format) # Need to translate '@[]*?.' to legal identifier characters # These will be fairly arbitrary and just need to avoid already used codes def make_legal_identifier(s: str) -> str: subs = {'@': 'A', '[': 'E', ']': 'e', '{': 'F', '}': 'f', '*': 'j', '.': 'q', '?': 'Q'} r = '' for c in s: if c in subs: r += subs[c] else: r += c return r def consume_function(name_prefix: str, scan_spec: str, prefix_args: List[Tuple[str, str]]) -> Tuple[str, List[str]]: p, _ = parse_item(scan_spec) params = p.gen_consume_params(0) function_name = name_prefix + '_' + make_legal_identifier(scan_spec) param_list = ', '.join([t+' '+arg for arg, t in prefix_args+params]) function_spec = f'size_t {function_name}({param_list})' function_decl = f'{function_spec};' prefix_params = [a for (a, _) in prefix_args] function_defn = [ f'{function_spec}', '{', f'{p.mk_indent(1)}pni_consumer_t consumer = make_consumer_from_bytes({", ".join(prefix_params)});', *p.gen_consume_code(['&consumer'], 0, 1), f'{p.mk_indent(1)}return consumer.position;', '}', '' ] return function_decl, function_defn def emit_function(name_prefix: str, fill_spec: str, prefix_args: List[Tuple[str, str]]) -> Tuple[str, List[str]]: p, _ = parse_item(fill_spec) params = p.gen_params(0) function_name = name_prefix + '_' + make_legal_identifier(fill_spec) param_list = ', '.join([t+' '+arg for arg, t in prefix_args+params]) function_spec = f'pn_bytes_t {function_name}({param_list})' bytes_args = [('bytes', 'char*'), ('size', 'size_t')] bytes_function_name = name_prefix + '_bytes_' + make_legal_identifier(fill_spec) bytes_param_list = ', '.join([t+' '+arg for arg, t in bytes_args+params]) bytes_function_spec = f'size_t {bytes_function_name}({bytes_param_list})' inner_function_name = name_prefix + '_inner_' + make_legal_identifier(fill_spec) inner_param_list = [('emitter', 'pni_emitter_t*')]+params inner_params = ', '.join([t+' '+arg for arg, t in inner_param_list]) inner_function_spec = f'bool {inner_function_name}({inner_params})' function_decl = f'{function_spec};\n{bytes_function_spec};' prefix_params = [a for (a, _) in prefix_args] args = [a for (a, _) in params] if type(p) is EmptyNode: function_defn = [ f'{function_spec}', '{', f'{p.mk_indent(1)}pni_emitter_t emitter = make_emitter({", ".join(prefix_params)});', f'{p.mk_indent(1)}return make_return(emitter);', '}', '' ] else: function_defn = [ f'{inner_function_spec}', '{', f'{p.mk_indent(1)}pni_compound_context compound = make_compound();', *p.gen_emit_code(['emitter', '&compound'], 0, 1), f'{p.mk_indent(1)}return resize_required(emitter);', '}', '', f'{function_spec}', '{', f'{p.mk_indent(1)}do {{', f'{p.mk_indent(2)}pni_emitter_t emitter = make_emitter_from_rwbytes({", ".join(prefix_params)});', f'{p.mk_indent(2)}if ({p.mk_funcall(inner_function_name, ["&emitter", *args])}) {{', f'{p.mk_indent(3)}{p.mk_funcall("size_buffer_to_emitter", [*prefix_params, "&emitter"])};', f'{p.mk_indent(3)}continue;', f'{p.mk_indent(2)}}}', f'{p.mk_indent(2)}return make_bytes_from_emitter(emitter);', f'{p.mk_indent(1)}}} while (true);', f'{p.mk_indent(1)}/*Unreachable*/', '}', '' f'{bytes_function_spec}', '{', f'{p.mk_indent(1)}pni_emitter_t emitter = make_emitter_from_bytes((pn_rwbytes_t){{.size=size, .start=bytes}});', f'{p.mk_indent(1)}{p.mk_funcall(inner_function_name, ["&emitter", *args])};', f'{p.mk_indent(1)}return make_bytes_from_emitter(emitter).size;', '}', '' ] return function_decl, function_defn prefix_emit_header = """ #include "proton/codec.h" #include "buffer.h" #include <stdbool.h> #include <stddef.h> #include <stdint.h> """ prefix_consume_header = """ #include "proton/codec.h" #include <stdbool.h> #include <stddef.h> #include <stdint.h> """ def output_header(decls: Dict[str, str], prefix: str, file=None): # Output declarations print(prefix, file=file) for fill_spec, decl in sorted(decls.items()): print( f'/* {fill_spec} */\n' f'{decl}', file=file ) prefix_emit_implementation = """ #include "core/emitters.h" """ prefix_consume_implementation = """ #include "core/consumers.h" """ def output_implementation(defns: Dict[str, List[str]], prefix: str, header=None, file=None): # Output implementations if header: print(f'#include "{header}"', file=file) print(prefix, file=file) for fill_spec, defn in sorted(defns.items()): printable_defn = '\n'.join(defn) print( f'/* {fill_spec} */\n' f'{printable_defn}', file=file ) def emit(fill_specs, decl_filename, impl_filename): decls: Dict[str, str] = {} defns: Dict[str, List[str]] = {} for fill_spec in fill_specs: decl, defn = emit_function('pn_amqp_encode', fill_spec, [('buffer', 'pn_rwbytes_t*')]) decls[fill_spec] = decl defns[fill_spec] = defn if decl_filename and impl_filename: with open(decl_filename, 'w') as dfile: output_header(decls, prefix_emit_header, file=dfile) with open(impl_filename, 'w') as ifile: output_implementation(defns, prefix_emit_implementation, header=os.path.basename(decl_filename), file=ifile) else: output_header(decls, prefix_emit_header) output_implementation(defns, prefix_emit_implementation) def consume(scan_specs, decl_filename, impl_filename): decls: Dict[str, str] = {} defns: Dict[str, List[str]] = {} for scan_spec in scan_specs: decl, defn = consume_function('pn_amqp_decode', scan_spec, [('bytes', 'pn_bytes_t')]) decls[scan_spec] = decl defns[scan_spec] = defn if decl_filename and impl_filename: with open(decl_filename, 'w') as dfile: output_header(decls, prefix_consume_header, file=dfile) with open(impl_filename, 'w') as ifile: output_implementation(defns, prefix_consume_implementation, header=os.path.basename(decl_filename), file=ifile) else: output_header(decls, prefix_consume_header) output_implementation(defns, prefix_consume_implementation) def main(): argparser = argparse.ArgumentParser(description='Generate AMQP codec in C for data scan/fill function calls') argparser.add_argument('-i', '--input_specs', help='json file with specs to generate codec output code', type=str, required=True) argparser.add_argument('-o', '--output-base', help='basename for output code', type=str) group = argparser.add_mutually_exclusive_group(required=True) group.add_argument('-e', '--emit', help='generate code to emit amqp', action='store_true') group.add_argument('-c', '--consume', help='generate code to consume amqp', action='store_true') args = argparser.parse_args() if args.output_base: decl_filename = args.output_base + '.h' impl_filename = args.output_base + '.c' else: decl_filename = None impl_filename = None with open(args.input_specs, 'r') as file: jsonfile = json.load(file) if args.emit: fill_specs = jsonfile['fill_specs'] emit(fill_specs, decl_filename, impl_filename) elif args.consume: scan_specs = jsonfile['scan_specs'] consume(scan_specs, decl_filename, impl_filename) if __name__ == '__main__': sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0]) sys.exit(main())