utils/generator.py (598 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import json import re import textwrap from urllib.error import HTTPError from urllib.request import urlopen from jinja2 import Environment, PackageLoader, select_autoescape from elasticsearch_dsl import VERSION jinja_env = Environment( loader=PackageLoader("utils"), autoescape=select_autoescape(), trim_blocks=True, lstrip_blocks=True, ) query_py = jinja_env.get_template("query.py.tpl") aggs_py = jinja_env.get_template("aggs.py.tpl") response_init_py = jinja_env.get_template("response.__init__.py.tpl") types_py = jinja_env.get_template("types.py.tpl") # map with name replacements for Elasticsearch attributes PROP_REPLACEMENTS = {"from": "from_"} # map with Elasticsearch type replacements # keys and values are in given in "{namespace}:{name}" format TYPE_REPLACEMENTS = { "_types.query_dsl:DistanceFeatureQuery": "_types.query_dsl:DistanceFeatureQueryBase", } # some aggregation types are complicated to determine from the schema, so they # have their correct type here AGG_TYPES = { "bucket_count_ks_test": "Pipeline", "bucket_correlation": "Pipeline", "bucket_sort": "Bucket", "categorize_text": "Bucket", "filter": "Bucket", "moving_avg": "Pipeline", "variable_width_histogram": "Bucket", } def property_to_class_name(name): return "".join([w.title() if w != "ip" else "IP" for w in name.split("_")]) def wrapped_doc(text, width=70, initial_indent="", subsequent_indent=""): """Formats a docstring as a list of lines of up to the request width.""" return textwrap.wrap( text.replace("\n", " "), width=width, initial_indent=initial_indent, subsequent_indent=subsequent_indent, ) def add_dict_type(type_): """Add Dict[str, Any] to a Python type hint.""" if type_.startswith("Union["): type_ = f"{type_[:-1]}, Dict[str, Any]]" else: type_ = f"Union[{type_}, Dict[str, Any]]" return type_ def add_seq_dict_type(type_): """Add Sequence[Dict[str, Any]] to a Python type hint.""" if type_.startswith("Union["): type_ = f"{type_[:-1]}, Sequence[Dict[str, Any]]]" else: type_ = f"Union[{type_}, Sequence[Dict[str, Any]]]" return type_ def add_not_set(type_): """Add DefaultType to a Python type hint.""" if type_.startswith("Union["): type_ = f'{type_[:-1]}, "DefaultType"]' else: type_ = f'Union[{type_}, "DefaultType"]' return type_ def type_for_types_py(type_): """Converts a type rendered in a generic way to the format needed in the types.py module. """ type_ = type_.replace('"DefaultType"', "DefaultType") type_ = type_.replace('"InstrumentedField"', "InstrumentedField") type_ = re.sub(r'"(function\.[a-zA-Z0-9_]+)"', r"\1", type_) type_ = re.sub(r'"types\.([a-zA-Z0-9_]+)"', r'"\1"', type_) type_ = re.sub(r'"(wrappers\.[a-zA-Z0-9_]+)"', r"\1", type_) return type_ class ElasticsearchSchema: """Operations related to the Elasticsearch schema.""" def __init__(self): response = None for branch in [f"{VERSION[0]}.{VERSION[1]}", "main"]: url = f"https://raw.githubusercontent.com/elastic/elasticsearch-specification/{branch}/output/schema/schema.json" try: response = urlopen(url) print(f"Initializing code generation with '{branch}' specification.") break except HTTPError: continue if not response: raise RuntimeError("Could not download Elasticsearch schema") self.schema = json.loads(response.read()) # Interfaces collects interfaces that are seen while traversing the schema. # Any interfaces collected here are then rendered as Python in the # types.py module. self.interfaces = [] self.response_interfaces = [] def find_type(self, name, namespace=None): for t in self.schema["types"]: if t["name"]["name"] == name and ( namespace is None or t["name"]["namespace"] == namespace ): return t def inherits_from(self, type_, name, namespace=None): while "inherits" in type_: type_ = self.find_type( type_["inherits"]["type"]["name"], type_["inherits"]["type"]["namespace"], ) if type_["name"]["name"] == name and ( namespace is None or type_["name"]["namespace"] == namespace ): return True return False def get_python_type(self, schema_type, for_response=False): """Obtain Python typing details for a given schema type This method returns a tuple. The first element is a string with the Python type hint. The second element is a dictionary with Python DSL specific typing details to be stored in the DslBase._param_defs attribute (or None if the type does not need to be in _param_defs). When `for_response` is `False`, any new interfaces that are discovered are registered to be generated in "request" style, with alternative Dict type hints and default values. If `for_response` is `True`, interfaces are generated just with their declared type, without Dict alternative and without defaults, to help type checkers be more effective at parsing response expressions. """ if schema_type["kind"] == "instance_of": type_name = schema_type["type"] if type_name["namespace"] in ["_types", "internal", "_builtins"]: if type_name["name"] in ["integer", "uint", "long", "ulong"]: return "int", None elif type_name["name"] in ["number", "float", "double"]: return "float", None elif type_name["name"] == "string": return "str", None elif type_name["name"] == "boolean": return "bool", None elif type_name["name"] == "binary": return "bytes", None elif type_name["name"] == "null": return "None", None elif type_name["name"] == "Field": if for_response: return "str", None else: return 'Union[str, "InstrumentedField"]', None else: # not an instance of a native type, so we get the type and try again return self.get_python_type( self.find_type(type_name["name"], type_name["namespace"]), for_response=for_response, ) elif ( type_name["namespace"] == "_types.query_dsl" and type_name["name"] == "QueryContainer" ): # QueryContainer maps to the DSL's Query class return "Query", {"type": "query"} elif ( type_name["namespace"] == "_types.query_dsl" and type_name["name"] == "FunctionScoreContainer" ): # FunctionScoreContainer maps to the DSL's ScoreFunction class return "ScoreFunction", {"type": "score_function"} elif ( type_name["namespace"] == "_types.aggregations" and type_name["name"] == "Buckets" ): if for_response: return "Union[Sequence[Any], Dict[str, Any]]", None else: return "Dict[str, Query]", {"type": "query", "hash": True} elif ( type_name["namespace"] == "_types.aggregations" and type_name["name"] == "CompositeAggregationSource" ): # QueryContainer maps to the DSL's Query class return "Agg[_R]", None else: # for any other instances we get the type and recurse type_ = self.find_type(type_name["name"], type_name["namespace"]) if type_: return self.get_python_type(type_, for_response=for_response) elif schema_type["kind"] == "type_alias": # for an alias, we use the aliased type return self.get_python_type(schema_type["type"], for_response=for_response) elif schema_type["kind"] == "array_of": # for arrays we use Sequence[element_type] type_, param = self.get_python_type( schema_type["value"], for_response=for_response ) return f"Sequence[{type_}]", {**param, "multi": True} if param else None elif schema_type["kind"] == "dictionary_of": # for dicts we use Mapping[key_type, value_type] key_type, key_param = self.get_python_type( schema_type["key"], for_response=for_response ) value_type, value_param = self.get_python_type( schema_type["value"], for_response=for_response ) return f"Mapping[{key_type}, {value_type}]", ( {**value_param, "hash": True} if value_param else None ) elif schema_type["kind"] == "union_of": if ( len(schema_type["items"]) == 2 and schema_type["items"][0]["kind"] == "instance_of" and schema_type["items"][1]["kind"] == "array_of" and schema_type["items"][0] == schema_type["items"][1]["value"] ): # special kind of unions in the form Union[type, Sequence[type]] type_, param = self.get_python_type( schema_type["items"][0], for_response=for_response ) if schema_type["items"][0]["type"]["name"] in [ "CompletionSuggestOption", "PhraseSuggestOption", "TermSuggestOption", ]: # for suggest types we simplify this type and return just the array form return ( f"Sequence[{type_}]", ({"type": param["type"], "multi": True} if param else None), ) else: # for every other types we produce an union with the two alternatives return ( f"Union[{type_}, Sequence[{type_}]]", ({"type": param["type"], "multi": True} if param else None), ) elif ( len(schema_type["items"]) == 2 and schema_type["items"][0]["kind"] == "instance_of" and schema_type["items"][1]["kind"] == "instance_of" and schema_type["items"][0]["type"] == {"name": "T", "namespace": "_spec_utils.PipeSeparatedFlags"} and schema_type["items"][1]["type"] == {"name": "string", "namespace": "_builtins"} ): # for now we treat PipeSeparatedFlags as a special case if "PipeSeparatedFlags" not in self.interfaces: self.interfaces.append("PipeSeparatedFlags") return '"types.PipeSeparatedFlags"', None else: # generic union type types = list( dict.fromkeys( # eliminate duplicates [ self.get_python_type(t, for_response=for_response) for t in schema_type["items"] ] ) ) return "Union[" + ", ".join([type_ for type_, _ in types]) + "]", None elif schema_type["kind"] == "enum": # enums are mapped to Literal[member, ...] return ( "Literal[" + ", ".join( [f"\"{member['name']}\"" for member in schema_type["members"]] ) + "]", None, ) elif schema_type["kind"] == "interface": if schema_type["name"]["namespace"] == "_types.query_dsl": # handle specific DSL classes explicitly to map to existing # Python DSL classes if schema_type["name"]["name"].endswith("RangeQuery"): return '"wrappers.Range[Any]"', None elif schema_type["name"]["name"].endswith("ScoreFunction"): # When dropping Python 3.8, use `removesuffix("Function")` instead name = schema_type["name"]["name"][:-8] return f'"function.{name}"', None elif schema_type["name"]["name"].endswith("DecayFunction"): return '"function.DecayFunction"', None elif schema_type["name"]["name"].endswith("Function"): return f"\"function.{schema_type['name']['name']}\"", None elif schema_type["name"]["namespace"] == "_types.analysis" and schema_type[ "name" ]["name"].endswith("Analyzer"): # not expanding analyzers at this time, maybe in the future return "str, Dict[str, Any]", None # to handle other interfaces we generate a type of the same name # and add the interface to the interfaces.py module if schema_type["name"]["name"] not in self.interfaces: self.interfaces.append(schema_type["name"]["name"]) if for_response: self.response_interfaces.append(schema_type["name"]["name"]) return f"\"types.{schema_type['name']['name']}\"", None elif schema_type["kind"] == "user_defined_value": # user_defined_value maps to Python's Any type return "Any", None raise RuntimeError(f"Cannot find Python type for {schema_type}") def add_attribute(self, k, arg, for_types_py=False, for_response=False): """Add an attribute to the internal representation of a class. This method adds the argument `arg` to the data structure for a class stored in `k`. In particular, the argument is added to the `k["args"]` list, making sure required arguments are first in the list. If the argument is of a type that needs Python DSL specific typing details to be stored in the DslBase._param_defs attribute, then this is added to `k["params"]`. When `for_types_py` is `True`, type hints are formatted in the most convenient way for the types.py file. When possible, double quotes are removed from types, and for types that are in the same file the quotes are kept to prevent forward references, but the "types." namespace is removed. When `for_types_py` is `False`, all non-native types use quotes and are namespaced. When `for_response` is `True`, type hints are not given the optional dictionary representation, nor the `DefaultType` used for omitted attributes. """ try: type_, param = self.get_python_type(arg["type"], for_response=for_response) except RuntimeError: type_ = "Any" param = None if not for_response: if type_ != "Any": if 'Sequence["types.' in type_: type_ = add_seq_dict_type(type_) # interfaces can be given as dicts elif "types." in type_: type_ = add_dict_type(type_) # interfaces can be given as dicts type_ = add_not_set(type_) if for_types_py: type_ = type_for_types_py(type_) required = "(required) " if arg["required"] else "" server_default = ( f" Defaults to `{arg['serverDefault']}` if omitted." if arg.get("serverDefault") else "" ) doc = wrapped_doc( f":arg {arg['name']}: {required}{arg.get('description', '')}{server_default}", subsequent_indent=" ", ) arg = { "name": PROP_REPLACEMENTS.get(arg["name"], arg["name"]), "type": type_, "doc": doc, "required": arg["required"], } if param is not None: param = {"name": arg["name"], "param": param} if arg["required"]: # insert in the right place so that all required arguments # appear at the top of the argument list i = 0 for i in range(len(k["args"]) + 1): if i == len(k["args"]): break if k["args"][i].get("positional"): continue if k["args"][i]["required"] is False: break k["args"].insert(i, arg) else: k["args"].append(arg) if param and "params" in k: k["params"].append(param) def add_behaviors(self, type_, k, for_types_py=False, for_response=False): """Add behaviors reported in the specification of the given type to the class representation. """ if "behaviors" in type_: for behavior in type_["behaviors"]: if ( behavior["type"]["name"] != "AdditionalProperty" or behavior["type"]["namespace"] != "_spec_utils" ): # we do not support this behavior, so we ignore it continue key_type, _ = self.get_python_type( behavior["generics"][0], for_response=for_response ) if "InstrumentedField" in key_type: value_type, _ = self.get_python_type( behavior["generics"][1], for_response=for_response ) if for_types_py: value_type = value_type.replace('"DefaultType"', "DefaultType") value_type = value_type.replace( '"InstrumentedField"', "InstrumentedField" ) value_type = re.sub( r'"(function\.[a-zA-Z0-9_]+)"', r"\1", value_type ) value_type = re.sub( r'"types\.([a-zA-Z0-9_]+)"', r'"\1"', value_type ) value_type = re.sub( r'"(wrappers\.[a-zA-Z0-9_]+)"', r"\1", value_type ) k["args"].append( { "name": "_field", "type": add_not_set(key_type), "doc": [":arg _field: The field to use in this query."], "required": False, "positional": True, } ) k["args"].append( { "name": "_value", "type": add_not_set(add_dict_type(value_type)), "doc": [":arg _value: The query value for the field."], "required": False, "positional": True, } ) k["is_single_field"] = True else: raise RuntimeError( f"Non-field AdditionalProperty are not supported for interface {type_['name']['namespace']}:{type_['name']['name']}." ) def property_to_python_class(self, p): """Return a dictionary with template data necessary to render a schema property as a Python class. Used for "container" sub-classes such as `QueryContainer`, where each sub-class is represented by a Python DSL class. The format is as follows: ```python { "property_name": "the name of the property", "name": "the class name to use for the property", "docstring": "the formatted docstring as a list of strings", "args": [ # a Python description of each class attribute "name": "the name of the attribute", "type": "the Python type hint for the attribute", "doc": ["formatted lines of documentation to add to class docstring"], "required": bool, "positional": bool, ], "params": [ "name": "the attribute name", "param": "the param dictionary to include in `_param_defs` for the class", ], # a DSL-specific description of interesting attributes "is_single_field": bool # True for single-key dicts with field key "is_multi_field": bool # True for multi-key dicts with field keys } ``` """ k = { "property_name": p["name"], "name": property_to_class_name(p["name"]), } k["docstring"] = wrapped_doc(p.get("description") or "") other_classes = [] kind = p["type"]["kind"] if kind == "instance_of": namespace = p["type"]["type"]["namespace"] name = p["type"]["type"]["name"] if f"{namespace}:{name}" in TYPE_REPLACEMENTS: namespace, name = TYPE_REPLACEMENTS[f"{namespace}:{name}"].split(":") if name == "QueryContainer" and namespace == "_types.query_dsl": type_ = { "kind": "interface", "properties": [p], } else: type_ = self.find_type(name, namespace) if p["name"] in AGG_TYPES: k["parent"] = AGG_TYPES[p["name"]] if type_["kind"] == "interface": # set the correct parent for bucket and pipeline aggregations if self.inherits_from( type_, "PipelineAggregationBase", "_types.aggregations" ): k["parent"] = "Pipeline" elif self.inherits_from( type_, "BucketAggregationBase", "_types.aggregations" ): k["parent"] = "Bucket" # generate class attributes k["args"] = [] k["params"] = [] self.add_behaviors(type_, k) while True: for arg in type_["properties"]: self.add_attribute(k, arg) if "inherits" in type_ and "type" in type_["inherits"]: type_ = self.find_type( type_["inherits"]["type"]["name"], type_["inherits"]["type"]["namespace"], ) else: break elif type_["kind"] == "type_alias": if type_["type"]["kind"] == "union_of": # for unions we create sub-classes for other in type_["type"]["items"]: other_class = self.interface_to_python_class( other["type"]["name"], other["type"]["namespace"], for_types_py=False, ) other_class["parent"] = k["name"] other_classes.append(other_class) else: raise RuntimeError( "Cannot generate code for instances of type_alias instances that are not unions." ) else: raise RuntimeError( f"Cannot generate code for instances of kind '{type_['kind']}'" ) elif kind == "dictionary_of": key_type, _ = self.get_python_type(p["type"]["key"]) if "InstrumentedField" in key_type: value_type, _ = self.get_python_type(p["type"]["value"]) if p["type"]["singleKey"]: # special handling for single-key dicts with field key k["args"] = [ { "name": "_field", "type": add_not_set(key_type), "doc": [":arg _field: The field to use in this query."], "required": False, "positional": True, }, { "name": "_value", "type": add_not_set(add_dict_type(value_type)), "doc": [":arg _value: The query value for the field."], "required": False, "positional": True, }, ] k["is_single_field"] = True else: # special handling for multi-key dicts with field keys k["args"] = [ { "name": "_fields", "type": f"Optional[Mapping[{key_type}, {value_type}]]", "doc": [ ":arg _fields: A dictionary of fields with their values." ], "required": False, "positional": True, }, ] k["is_multi_field"] = True else: raise RuntimeError(f"Cannot generate code for type {p['type']}") else: raise RuntimeError(f"Cannot generate code for type {p['type']}") return [k] + other_classes def interface_to_python_class( self, interface, namespace=None, *, for_types_py=True, for_response=False, ): """Return a dictionary with template data necessary to render an interface a Python class. This is used to render interfaces that are referenced by container classes. The current list of rendered interfaces is passed as a second argument to allow this method to add more interfaces to it as they are discovered. The returned format is as follows: ```python { "name": "the class name to use for the interface class", "parent": "the parent class name", "args": [ # a Python description of each class attribute "name": "the name of the attribute", "type": "the Python type hint for the attribute", "doc": ["formatted lines of documentation to add to class docstring"], "required": bool, "positional": bool, ], "buckets_as_dict": "type" # optional, only present in aggregation response # classes that have buckets that can have a list # or dict representation } ``` """ type_ = self.find_type(interface, namespace) if type_["kind"] not in ["interface", "response"]: raise RuntimeError(f"Type {interface} is not an interface") if type_["kind"] == "response": # we consider responses as interfaces because they also have properties # but the location of the properties is different type_ = type_["body"] k = {"name": interface, "for_response": for_response, "args": []} k["docstring"] = wrapped_doc(type_.get("description") or "") self.add_behaviors( type_, k, for_types_py=for_types_py, for_response=for_response ) generics = [] while True: for arg in type_["properties"]: if interface == "ResponseBody" and arg["name"] == "hits": k["args"].append( { "name": "hits", "type": "Sequence[_R]", "doc": [":arg hits: search results"], "required": arg["required"], } ) elif interface == "ResponseBody" and arg["name"] == "aggregations": # Aggregations are tricky because the DSL client uses a # flexible representation that is difficult to generate # from the schema. # To handle this we let the generator do its work by calling # `add_attribute()`, but then we save the generated attribute # apart and replace it with the DSL's `AggResponse` class. # The generated type is then used in type hints in variables # and methods of this class. self.add_attribute( k, arg, for_types_py=for_types_py, for_response=for_response ) k["aggregate_type"] = ( k["args"][-1]["type"] .split("Mapping[str, ")[1] .rsplit("]", 1)[0] ) k["args"][-1] = { "name": "aggregations", "type": '"AggResponse[_R]"', "doc": [":arg aggregations: aggregation results"], "required": arg["required"], } elif ( "name" in type_ and type_["name"]["name"] == "MultiBucketAggregateBase" and arg["name"] == "buckets" ): # Also during aggregation response generation, the "buckets" # attribute that many aggregation responses have is very # complex, supporting over a dozen different aggregation # types via generics, each in array or object configurations. # Typing this attribute proved very difficult. A solution # that worked with mypy and pyright is to type "buckets" # for the list form, and create a `buckets_as_dict` # property that is typed appropriately for accessing the # buckets in dictionary form. # The generic type is assumed to be the first in the list, # which is a simplification that should be improved when a # more complete implementation of generics is added. if generics[0]["type"]["name"] == "Void": generic_type = "Any" else: _g = self.find_type( generics[0]["type"]["name"], generics[0]["type"]["namespace"], ) generic_type, _ = self.get_python_type( _g, for_response=for_response ) generic_type = type_for_types_py(generic_type) k["args"].append( { "name": arg["name"], # for the type we only include the array form, since # this client does not request the dict form "type": f"Sequence[{generic_type}]", "doc": [ ":arg buckets: (required) the aggregation buckets as a list" ], "required": True, } ) k["buckets_as_dict"] = generic_type else: if interface == "Hit" and arg["name"].startswith("_"): # Python DSL removes the undersore prefix from all the # properties of the hit, so we do the same arg["name"] = arg["name"][1:] self.add_attribute( k, arg, for_types_py=for_types_py, for_response=for_response ) if "inherits" not in type_ or "type" not in type_["inherits"]: break if "generics" in type_["inherits"]: # Generics are only supported for certain specific cases at this # time. Here we just save them so that they can be recalled later # while traversing over to parent classes to find inherited # attributes. for generic_type in type_["inherits"]["generics"]: generics.append(generic_type) type_ = self.find_type( type_["inherits"]["type"]["name"], type_["inherits"]["type"]["namespace"], ) return k def generate_query_py(schema, filename): """Generate query.py with all the properties of `QueryContainer` as Python classes. """ classes = [] query_container = schema.find_type("QueryContainer", "_types.query_dsl") for p in query_container["properties"]: classes += schema.property_to_python_class(p) with open(filename, "wt") as f: f.write(query_py.render(classes=classes, parent="Query")) print(f"Generated {filename}.") def generate_aggs_py(schema, filename): """Generate aggs.py with all the properties of `AggregationContainer` as Python classes. """ classes = [] aggs_container = schema.find_type("AggregationContainer", "_types.aggregations") for p in aggs_container["properties"]: if "containerProperty" not in p or not p["containerProperty"]: classes += schema.property_to_python_class(p) with open(filename, "wt") as f: f.write(aggs_py.render(classes=classes, parent="Agg")) print(f"Generated {filename}.") def generate_response_init_py(schema, filename): """Generate response/__init__.py with all the response properties documented and typed. """ search_response = schema.interface_to_python_class( "ResponseBody", "_global.search", for_types_py=False, for_response=True, ) ubq_response = schema.interface_to_python_class( "Response", "_global.update_by_query", for_types_py=False, for_response=True, ) with open(filename, "wt") as f: f.write( response_init_py.render(response=search_response, ubq_response=ubq_response) ) print(f"Generated {filename}.") def generate_types_py(schema, filename): """Generate types.py""" classes = {} for interface in schema.interfaces: if interface == "PipeSeparatedFlags": continue # handled as a special case for_response = interface in schema.response_interfaces k = schema.interface_to_python_class( interface, for_types_py=True, for_response=for_response ) classes[k["name"]] = k # sort classes by being request/response and then by name sorted_classes = sorted( list(classes.keys()), key=lambda i: str(int(i in schema.response_interfaces)) + i, ) classes_list = [] for n in sorted_classes: k = classes[n] if k in classes_list: continue classes_list.append(k) with open(filename, "wt") as f: f.write(types_py.render(classes=classes_list)) print(f"Generated {filename}.") if __name__ == "__main__": schema = ElasticsearchSchema() generate_query_py(schema, "elasticsearch_dsl/query.py") generate_aggs_py(schema, "elasticsearch_dsl/aggs.py") generate_response_init_py(schema, "elasticsearch_dsl/response/__init__.py") # generate_response_hit_py(schema, "elasticsearch_dsl/response/hit.py") # generate_response_aggs_py(schema, "elasticsearch_dsl/response/aggs.py") generate_types_py(schema, "elasticsearch_dsl/types.py")