in utils/generator.py [0:0]
def get_python_type(self, schema_type, for_response=False):
"""Obtain Python typing details for a given schema type
This method returns a tuple. The first element is a string with the
Python type hint. The second element is a dictionary with Python DSL
specific typing details to be stored in the DslBase._param_defs
attribute (or None if the type does not need to be in _param_defs).
When `for_response` is `False`, any new interfaces that are discovered
are registered to be generated in "request" style, with alternative
Dict type hints and default values. If `for_response` is `True`,
interfaces are generated just with their declared type, without
Dict alternative and without defaults, to help type checkers be more
effective at parsing response expressions.
"""
if schema_type["kind"] == "instance_of":
type_name = schema_type["type"]
if type_name["namespace"] in ["_types", "internal", "_builtins"]:
if type_name["name"] in ["integer", "uint", "long", "ulong"]:
return "int", None
elif type_name["name"] in ["number", "float", "double"]:
return "float", None
elif type_name["name"] == "string":
return "str", None
elif type_name["name"] == "boolean":
return "bool", None
elif type_name["name"] == "binary":
return "bytes", None
elif type_name["name"] == "null":
return "None", None
elif type_name["name"] == "Field":
if for_response:
return "str", None
else:
return 'Union[str, "InstrumentedField"]', None
else:
# not an instance of a native type, so we get the type and try again
return self.get_python_type(
self.find_type(type_name["name"], type_name["namespace"]),
for_response=for_response,
)
elif (
type_name["namespace"] == "_types.query_dsl"
and type_name["name"] == "QueryContainer"
):
# QueryContainer maps to the DSL's Query class
return "Query", {"type": "query"}
elif (
type_name["namespace"] == "_types.query_dsl"
and type_name["name"] == "FunctionScoreContainer"
):
# FunctionScoreContainer maps to the DSL's ScoreFunction class
return "ScoreFunction", {"type": "score_function"}
elif (
type_name["namespace"] == "_types.aggregations"
and type_name["name"] == "Buckets"
):
if for_response:
return "Union[Sequence[Any], Dict[str, Any]]", None
else:
return "Dict[str, Query]", {"type": "query", "hash": True}
elif (
type_name["namespace"] == "_types.aggregations"
and type_name["name"] == "CompositeAggregationSource"
):
# QueryContainer maps to the DSL's Query class
return "Agg[_R]", None
else:
# for any other instances we get the type and recurse
type_ = self.find_type(type_name["name"], type_name["namespace"])
if type_:
return self.get_python_type(type_, for_response=for_response)
elif schema_type["kind"] == "type_alias":
# for an alias, we use the aliased type
return self.get_python_type(schema_type["type"], for_response=for_response)
elif schema_type["kind"] == "array_of":
# for arrays we use Sequence[element_type]
type_, param = self.get_python_type(
schema_type["value"], for_response=for_response
)
return f"Sequence[{type_}]", {**param, "multi": True} if param else None
elif schema_type["kind"] == "dictionary_of":
# for dicts we use Mapping[key_type, value_type]
key_type, key_param = self.get_python_type(
schema_type["key"], for_response=for_response
)
value_type, value_param = self.get_python_type(
schema_type["value"], for_response=for_response
)
return f"Mapping[{key_type}, {value_type}]", (
{**value_param, "hash": True} if value_param else None
)
elif schema_type["kind"] == "union_of":
if (
len(schema_type["items"]) == 2
and schema_type["items"][0]["kind"] == "instance_of"
and schema_type["items"][1]["kind"] == "array_of"
and schema_type["items"][0] == schema_type["items"][1]["value"]
):
# special kind of unions in the form Union[type, Sequence[type]]
type_, param = self.get_python_type(
schema_type["items"][0], for_response=for_response
)
if schema_type["items"][0]["type"]["name"] in [
"CompletionSuggestOption",
"PhraseSuggestOption",
"TermSuggestOption",
]:
# for suggest types we simplify this type and return just the array form
return (
f"Sequence[{type_}]",
({"type": param["type"], "multi": True} if param else None),
)
else:
# for every other types we produce an union with the two alternatives
return (
f"Union[{type_}, Sequence[{type_}]]",
({"type": param["type"], "multi": True} if param else None),
)
elif (
len(schema_type["items"]) == 2
and schema_type["items"][0]["kind"] == "instance_of"
and schema_type["items"][1]["kind"] == "instance_of"
and schema_type["items"][0]["type"]
== {"name": "T", "namespace": "_spec_utils.PipeSeparatedFlags"}
and schema_type["items"][1]["type"]
== {"name": "string", "namespace": "_builtins"}
):
# for now we treat PipeSeparatedFlags as a special case
if "PipeSeparatedFlags" not in self.interfaces:
self.interfaces.append("PipeSeparatedFlags")
return '"types.PipeSeparatedFlags"', None
else:
# generic union type
types = list(
dict.fromkeys( # eliminate duplicates
[
self.get_python_type(t, for_response=for_response)
for t in schema_type["items"]
]
)
)
return "Union[" + ", ".join([type_ for type_, _ in types]) + "]", None
elif schema_type["kind"] == "enum":
# enums are mapped to Literal[member, ...]
return (
"Literal["
+ ", ".join(
[f"\"{member['name']}\"" for member in schema_type["members"]]
)
+ "]",
None,
)
elif schema_type["kind"] == "interface":
if schema_type["name"]["namespace"] == "_types.query_dsl":
# handle specific DSL classes explicitly to map to existing
# Python DSL classes
if schema_type["name"]["name"].endswith("RangeQuery"):
return '"wrappers.Range[Any]"', None
elif schema_type["name"]["name"].endswith("ScoreFunction"):
# When dropping Python 3.8, use `removesuffix("Function")` instead
name = schema_type["name"]["name"][:-8]
return f'"function.{name}"', None
elif schema_type["name"]["name"].endswith("DecayFunction"):
return '"function.DecayFunction"', None
elif schema_type["name"]["name"].endswith("Function"):
return f"\"function.{schema_type['name']['name']}\"", None
elif schema_type["name"]["namespace"] == "_types.analysis" and schema_type[
"name"
]["name"].endswith("Analyzer"):
# not expanding analyzers at this time, maybe in the future
return "str, Dict[str, Any]", None
# to handle other interfaces we generate a type of the same name
# and add the interface to the interfaces.py module
if schema_type["name"]["name"] not in self.interfaces:
self.interfaces.append(schema_type["name"]["name"])
if for_response:
self.response_interfaces.append(schema_type["name"]["name"])
return f"\"types.{schema_type['name']['name']}\"", None
elif schema_type["kind"] == "user_defined_value":
# user_defined_value maps to Python's Any type
return "Any", None
raise RuntimeError(f"Cannot find Python type for {schema_type}")