in utils/generator.py [0:0]
def property_to_python_class(self, p):
"""Return a dictionary with template data necessary to render a schema
property as a Python class.
Used for "container" sub-classes such as `QueryContainer`, where each
sub-class is represented by a Python DSL class.
The format is as follows:
```python
{
"property_name": "the name of the property",
"name": "the class name to use for the property",
"docstring": "the formatted docstring as a list of strings",
"args": [ # a Python description of each class attribute
"name": "the name of the attribute",
"type": "the Python type hint for the attribute",
"doc": ["formatted lines of documentation to add to class docstring"],
"required": bool,
"positional": bool,
],
"params": [
"name": "the attribute name",
"param": "the param dictionary to include in `_param_defs` for the class",
], # a DSL-specific description of interesting attributes
"is_single_field": bool # True for single-key dicts with field key
"is_multi_field": bool # True for multi-key dicts with field keys
}
```
"""
k = {
"property_name": p["name"],
"name": property_to_class_name(p["name"]),
}
k["docstring"] = wrapped_doc(p.get("description") or "")
other_classes = []
kind = p["type"]["kind"]
if kind == "instance_of":
namespace = p["type"]["type"]["namespace"]
name = p["type"]["type"]["name"]
if f"{namespace}:{name}" in TYPE_REPLACEMENTS:
namespace, name = TYPE_REPLACEMENTS[f"{namespace}:{name}"].split(":")
if name == "QueryContainer" and namespace == "_types.query_dsl":
type_ = {
"kind": "interface",
"properties": [p],
}
else:
type_ = self.find_type(name, namespace)
if p["name"] in AGG_TYPES:
k["parent"] = AGG_TYPES[p["name"]]
if type_["kind"] == "interface":
# set the correct parent for bucket and pipeline aggregations
if self.inherits_from(
type_, "PipelineAggregationBase", "_types.aggregations"
):
k["parent"] = "Pipeline"
elif self.inherits_from(
type_, "BucketAggregationBase", "_types.aggregations"
):
k["parent"] = "Bucket"
# generate class attributes
k["args"] = []
k["params"] = []
self.add_behaviors(type_, k)
while True:
for arg in type_["properties"]:
self.add_attribute(k, arg)
if "inherits" in type_ and "type" in type_["inherits"]:
type_ = self.find_type(
type_["inherits"]["type"]["name"],
type_["inherits"]["type"]["namespace"],
)
else:
break
elif type_["kind"] == "type_alias":
if type_["type"]["kind"] == "union_of":
# for unions we create sub-classes
for other in type_["type"]["items"]:
other_class = self.interface_to_python_class(
other["type"]["name"],
other["type"]["namespace"],
for_types_py=False,
)
other_class["parent"] = k["name"]
other_classes.append(other_class)
else:
raise RuntimeError(
"Cannot generate code for instances of type_alias instances that are not unions."
)
else:
raise RuntimeError(
f"Cannot generate code for instances of kind '{type_['kind']}'"
)
elif kind == "dictionary_of":
key_type, _ = self.get_python_type(p["type"]["key"])
if "InstrumentedField" in key_type:
value_type, _ = self.get_python_type(p["type"]["value"])
if p["type"]["singleKey"]:
# special handling for single-key dicts with field key
k["args"] = [
{
"name": "_field",
"type": add_not_set(key_type),
"doc": [":arg _field: The field to use in this query."],
"required": False,
"positional": True,
},
{
"name": "_value",
"type": add_not_set(add_dict_type(value_type)),
"doc": [":arg _value: The query value for the field."],
"required": False,
"positional": True,
},
]
k["is_single_field"] = True
else:
# special handling for multi-key dicts with field keys
k["args"] = [
{
"name": "_fields",
"type": f"Optional[Mapping[{key_type}, {value_type}]]",
"doc": [
":arg _fields: A dictionary of fields with their values."
],
"required": False,
"positional": True,
},
]
k["is_multi_field"] = True
else:
raise RuntimeError(f"Cannot generate code for type {p['type']}")
else:
raise RuntimeError(f"Cannot generate code for type {p['type']}")
return [k] + other_classes