in opensearch_dsl/aggs.py [0:0]
def A(name_or_agg, filter=None, **params):
if filter is not None:
if name_or_agg != "filter":
raise ValueError(
"Aggregation %r doesn't accept positional argument 'filter'."
% name_or_agg
)
params["filter"] = filter
# {"terms": {"field": "tags"}, "aggs": {...}}
if isinstance(name_or_agg, collections_abc.Mapping):
if params:
raise ValueError("A() cannot accept parameters when passing in a dict.")
# copy to avoid modifying in-place
agg = name_or_agg.copy()
# pop out nested aggs
aggs = agg.pop("aggs", None)
# pop out meta data
meta = agg.pop("meta", None)
# should be {"terms": {"field": "tags"}}
if len(agg) != 1:
raise ValueError(
'A() can only accept dict with an aggregation ({"terms": {...}}). '
"Instead it got (%r)" % name_or_agg
)
agg_type, params = agg.popitem()
if aggs:
params = params.copy()
params["aggs"] = aggs
if meta:
params = params.copy()
params["meta"] = meta
return Agg.get_dsl_class(agg_type)(_expand__to_dot=False, **params)
# Terms(...) just return the nested agg
elif isinstance(name_or_agg, Agg):
if params:
raise ValueError(
"A() cannot accept parameters when passing in an Agg object."
)
return name_or_agg
# "terms", field="tags"
return Agg.get_dsl_class(name_or_agg)(**params)