elasticsearch/dsl/search_base.py (575 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import collections.abc
import copy
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
Iterator,
List,
Optional,
Protocol,
Tuple,
Type,
Union,
cast,
overload,
)
from typing_extensions import Self, TypeVar
from .aggs import A, Agg, AggBase
from .document_base import InstrumentedField
from .exceptions import IllegalOperation
from .query import Bool, Q, Query
from .response import Hit, Response
from .utils import _R, AnyUsingType, AttrDict, DslBase, recursive_to_dict
if TYPE_CHECKING:
from .field import Field, Object
class SupportsClone(Protocol):
def _clone(self) -> Self: ...
_S = TypeVar("_S", bound=SupportsClone)
class QueryProxy(Generic[_S]):
"""
Simple proxy around DSL objects (queries) that can be called
(to add query/post_filter) and also allows attribute access which is proxied to
the wrapped query.
"""
def __init__(self, search: _S, attr_name: str):
self._search = search
self._proxied: Optional[Query] = None
self._attr_name = attr_name
def __nonzero__(self) -> bool:
return self._proxied is not None
__bool__ = __nonzero__
def __call__(self, *args: Any, **kwargs: Any) -> _S:
"""
Add a query.
"""
s = self._search._clone()
# we cannot use self._proxied since we just cloned self._search and
# need to access the new self on the clone
proxied = getattr(s, self._attr_name)
if proxied._proxied is None:
proxied._proxied = Q(*args, **kwargs)
else:
proxied._proxied &= Q(*args, **kwargs)
# always return search to be chainable
return s
def __getattr__(self, attr_name: str) -> Any:
return getattr(self._proxied, attr_name)
def __setattr__(self, attr_name: str, value: Any) -> None:
if not attr_name.startswith("_"):
if self._proxied is not None:
self._proxied = Q(self._proxied.to_dict())
setattr(self._proxied, attr_name, value)
super().__setattr__(attr_name, value)
def __getstate__(self) -> Tuple[_S, Optional[Query], str]:
return self._search, self._proxied, self._attr_name
def __setstate__(self, state: Tuple[_S, Optional[Query], str]) -> None:
self._search, self._proxied, self._attr_name = state
class ProxyDescriptor(Generic[_S]):
"""
Simple descriptor to enable setting of queries and filters as:
s = Search()
s.query = Q(...)
"""
def __init__(self, name: str):
self._attr_name = f"_{name}_proxy"
def __get__(self, instance: Any, owner: object) -> QueryProxy[_S]:
return cast(QueryProxy[_S], getattr(instance, self._attr_name))
def __set__(self, instance: _S, value: Dict[str, Any]) -> None:
proxy: QueryProxy[_S] = getattr(instance, self._attr_name)
proxy._proxied = Q(value)
class AggsProxy(AggBase[_R], DslBase):
name = "aggs"
def __init__(self, search: "SearchBase[_R]"):
self._base = cast("Agg[_R]", self)
self._search = search
self._params = {"aggs": {}}
def to_dict(self) -> Dict[str, Any]:
return cast(Dict[str, Any], super().to_dict().get("aggs", {}))
class Request(Generic[_R]):
def __init__(
self,
using: AnyUsingType = "default",
index: Optional[Union[str, List[str]]] = None,
doc_type: Optional[
Union[type, str, List[Union[type, str]], Dict[str, Union[type, str]]]
] = None,
extra: Optional[Dict[str, Any]] = None,
):
self._using = using
self._index = None
if isinstance(index, (tuple, list)):
self._index = list(index)
elif index:
self._index = [index]
self._doc_type: List[Union[type, str]] = []
self._doc_type_map: Dict[str, Any] = {}
if isinstance(doc_type, (tuple, list)):
self._doc_type.extend(doc_type)
elif isinstance(doc_type, collections.abc.Mapping):
self._doc_type.extend(doc_type.keys())
self._doc_type_map.update(doc_type)
elif doc_type:
self._doc_type.append(doc_type)
self._params: Dict[str, Any] = {}
self._extra: Dict[str, Any] = extra or {}
def __eq__(self, other: Any) -> bool:
return (
isinstance(other, Request)
and other._params == self._params
and other._index == self._index
and other._doc_type == self._doc_type
and other.to_dict() == self.to_dict()
)
def __copy__(self) -> Self:
return self._clone()
def params(self, **kwargs: Any) -> Self:
"""
Specify query params to be used when executing the search. All the
keyword arguments will override the current values. See
https://elasticsearch-py.readthedocs.io/en/latest/api/elasticsearch.html#elasticsearch.Elasticsearch.search
for all available parameters.
Example::
s = Search()
s = s.params(routing='user-1', preference='local')
"""
s = self._clone()
s._params.update(kwargs)
return s
def index(self, *index: Union[str, List[str], Tuple[str, ...]]) -> Self:
"""
Set the index for the search. If called empty it will remove all information.
Example::
s = Search()
s = s.index('twitter-2015.01.01', 'twitter-2015.01.02')
s = s.index(['twitter-2015.01.01', 'twitter-2015.01.02'])
"""
# .index() resets
s = self._clone()
if not index:
s._index = None
else:
indexes = []
for i in index:
if isinstance(i, str):
indexes.append(i)
elif isinstance(i, list):
indexes += i
elif isinstance(i, tuple):
indexes += list(i)
s._index = (self._index or []) + indexes
return s
def _resolve_field(self, path: str) -> Optional["Field"]:
for dt in self._doc_type:
if not hasattr(dt, "_index"):
continue
field = dt._index.resolve_field(path)
if field is not None:
return cast("Field", field)
return None
def _resolve_nested(
self, hit: AttrDict[Any], parent_class: Optional[type] = None
) -> Type[_R]:
doc_class = Hit
nested_path = []
nesting = hit["_nested"]
while nesting and "field" in nesting:
nested_path.append(nesting["field"])
nesting = nesting.get("_nested")
nested_path_str = ".".join(nested_path)
nested_field: Optional["Object"]
if parent_class is not None and hasattr(parent_class, "_index"):
nested_field = cast(
Optional["Object"], parent_class._index.resolve_field(nested_path_str)
)
else:
nested_field = cast(
Optional["Object"], self._resolve_field(nested_path_str)
)
if nested_field is not None:
return cast(Type[_R], nested_field._doc_class)
return cast(Type[_R], doc_class)
def _get_result(
self, hit: AttrDict[Any], parent_class: Optional[type] = None
) -> _R:
doc_class: Any = Hit
dt = hit.get("_type")
if "_nested" in hit:
doc_class = self._resolve_nested(hit, parent_class)
elif dt in self._doc_type_map:
doc_class = self._doc_type_map[dt]
else:
for doc_type in self._doc_type:
if hasattr(doc_type, "_matches") and doc_type._matches(hit):
doc_class = doc_type
break
for t in hit.get("inner_hits", ()):
hit["inner_hits"][t] = Response[_R](
self, hit["inner_hits"][t], doc_class=doc_class
)
callback = getattr(doc_class, "from_es", doc_class)
return cast(_R, callback(hit))
def doc_type(
self, *doc_type: Union[type, str], **kwargs: Callable[[AttrDict[Any]], Any]
) -> Self:
"""
Set the type to search through. You can supply a single value or
multiple. Values can be strings or subclasses of ``Document``.
You can also pass in any keyword arguments, mapping a doc_type to a
callback that should be used instead of the Hit class.
If no doc_type is supplied any information stored on the instance will
be erased.
Example:
s = Search().doc_type('product', 'store', User, custom=my_callback)
"""
# .doc_type() resets
s = self._clone()
if not doc_type and not kwargs:
s._doc_type = []
s._doc_type_map = {}
else:
s._doc_type.extend(doc_type)
s._doc_type.extend(kwargs.keys())
s._doc_type_map.update(kwargs)
return s
def using(self, client: AnyUsingType) -> Self:
"""
Associate the search request with an elasticsearch client. A fresh copy
will be returned with current instance remaining unchanged.
:arg client: an instance of ``elasticsearch.Elasticsearch`` to use or
an alias to look up in ``elasticsearch.dsl.connections``
"""
s = self._clone()
s._using = client
return s
def extra(self, **kwargs: Any) -> Self:
"""
Add extra keys to the request body. Mostly here for backwards
compatibility.
"""
s = self._clone()
if "from_" in kwargs:
kwargs["from"] = kwargs.pop("from_")
s._extra.update(kwargs)
return s
def _clone(self) -> Self:
s = self.__class__(
using=self._using, index=self._index, doc_type=self._doc_type
)
s._doc_type_map = self._doc_type_map.copy()
s._extra = self._extra.copy()
s._params = self._params.copy()
return s
if TYPE_CHECKING:
def to_dict(self) -> Dict[str, Any]: ...
class SearchBase(Request[_R]):
query = ProxyDescriptor[Self]("query")
post_filter = ProxyDescriptor[Self]("post_filter")
_response: Response[_R]
def __init__(
self,
using: AnyUsingType = "default",
index: Optional[Union[str, List[str]]] = None,
**kwargs: Any,
):
"""
Search request to elasticsearch.
:arg using: `Elasticsearch` instance to use
:arg index: limit the search to index
All the parameters supplied (or omitted) at creation type can be later
overridden by methods (`using`, `index` and `doc_type` respectively).
"""
super().__init__(using=using, index=index, **kwargs)
self.aggs = AggsProxy[_R](self)
self._sort: List[Union[str, Dict[str, Dict[str, str]]]] = []
self._knn: List[Dict[str, Any]] = []
self._rank: Dict[str, Any] = {}
self._collapse: Dict[str, Any] = {}
self._source: Optional[Union[bool, List[str], Dict[str, List[str]]]] = None
self._highlight: Dict[str, Any] = {}
self._highlight_opts: Dict[str, Any] = {}
self._suggest: Dict[str, Any] = {}
self._script_fields: Dict[str, Any] = {}
self._response_class = Response[_R]
self._query_proxy = QueryProxy(self, "query")
self._post_filter_proxy = QueryProxy(self, "post_filter")
def filter(self, *args: Any, **kwargs: Any) -> Self:
"""
Add a query in filter context.
"""
return self.query(Bool(filter=[Q(*args, **kwargs)]))
def exclude(self, *args: Any, **kwargs: Any) -> Self:
"""
Add a negative query in filter context.
"""
return self.query(Bool(filter=[~Q(*args, **kwargs)]))
def __getitem__(self, n: Union[int, slice]) -> Self:
"""
Support slicing the `Search` instance for pagination.
Slicing equates to the from/size parameters. E.g.::
s = Search().query(...)[0:25]
is equivalent to::
s = Search().query(...).extra(from_=0, size=25)
"""
s = self._clone()
if isinstance(n, slice):
# If negative slicing, abort.
if n.start and n.start < 0 or n.stop and n.stop < 0:
raise ValueError("Search does not support negative slicing.")
slice_start = n.start
slice_stop = n.stop
else: # This is an index lookup, equivalent to slicing by [n:n+1].
# If negative index, abort.
if n < 0:
raise ValueError("Search does not support negative indexing.")
slice_start = n
slice_stop = n + 1
old_from = s._extra.get("from")
old_to = None
if "size" in s._extra:
old_to = (old_from or 0) + s._extra["size"]
new_from = old_from
if slice_start is not None:
new_from = (old_from or 0) + slice_start
new_to = old_to
if slice_stop is not None:
new_to = (old_from or 0) + slice_stop
if old_to is not None and old_to < new_to:
new_to = old_to
if new_from is not None:
s._extra["from"] = new_from
if new_to is not None:
s._extra["size"] = max(0, new_to - (new_from or 0))
return s
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> Self:
"""
Construct a new `Search` instance from a raw dict containing the search
body. Useful when migrating from raw dictionaries.
Example::
s = Search.from_dict({
"query": {
"bool": {
"must": [...]
}
},
"aggs": {...}
})
s = s.filter('term', published=True)
"""
s = cls()
s.update_from_dict(d)
return s
def _clone(self) -> Self:
"""
Return a clone of the current search request. Performs a shallow copy
of all the underlying objects. Used internally by most state modifying
APIs.
"""
s = super()._clone()
s._response_class = self._response_class
s._knn = [knn.copy() for knn in self._knn]
s._rank = self._rank.copy()
s._collapse = self._collapse.copy()
s._sort = self._sort[:]
s._source = copy.copy(self._source) if self._source is not None else None
s._highlight = self._highlight.copy()
s._highlight_opts = self._highlight_opts.copy()
s._suggest = self._suggest.copy()
s._script_fields = self._script_fields.copy()
for x in ("query", "post_filter"):
getattr(s, x)._proxied = getattr(self, x)._proxied
# copy top-level bucket definitions
if self.aggs._params.get("aggs"):
s.aggs._params = {"aggs": self.aggs._params["aggs"].copy()}
return s
def response_class(self, cls: Type[Response[_R]]) -> Self:
"""
Override the default wrapper used for the response.
"""
s = self._clone()
s._response_class = cls
return s
def update_from_dict(self, d: Dict[str, Any]) -> Self:
"""
Apply options from a serialized body to the current instance. Modifies
the object in-place. Used mostly by ``from_dict``.
"""
d = d.copy()
if "query" in d:
self.query._proxied = Q(d.pop("query"))
if "post_filter" in d:
self.post_filter._proxied = Q(d.pop("post_filter"))
aggs = d.pop("aggs", d.pop("aggregations", {}))
if aggs:
self.aggs._params = {
"aggs": {name: A(value) for (name, value) in aggs.items()}
}
if "knn" in d:
self._knn = d.pop("knn")
if isinstance(self._knn, dict):
self._knn = [self._knn]
if "rank" in d:
self._rank = d.pop("rank")
if "collapse" in d:
self._collapse = d.pop("collapse")
if "sort" in d:
self._sort = d.pop("sort")
if "_source" in d:
self._source = d.pop("_source")
if "highlight" in d:
high = d.pop("highlight").copy()
self._highlight = high.pop("fields")
self._highlight_opts = high
if "suggest" in d:
self._suggest = d.pop("suggest")
if "text" in self._suggest:
text = self._suggest.pop("text")
for s in self._suggest.values():
s.setdefault("text", text)
if "script_fields" in d:
self._script_fields = d.pop("script_fields")
self._extra.update(d)
return self
def script_fields(self, **kwargs: Any) -> Self:
"""
Define script fields to be calculated on hits. See
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-script-fields.html
for more details.
Example::
s = Search()
s = s.script_fields(times_two="doc['field'].value * 2")
s = s.script_fields(
times_three={
'script': {
'lang': 'painless',
'source': "doc['field'].value * params.n",
'params': {'n': 3}
}
}
)
"""
s = self._clone()
for name in kwargs:
if isinstance(kwargs[name], str):
kwargs[name] = {"script": kwargs[name]}
s._script_fields.update(kwargs)
return s
def knn(
self,
field: Union[str, "InstrumentedField"],
k: int,
num_candidates: int,
query_vector: Optional[List[float]] = None,
query_vector_builder: Optional[Dict[str, Any]] = None,
boost: Optional[float] = None,
filter: Optional[Query] = None,
similarity: Optional[float] = None,
inner_hits: Optional[Dict[str, Any]] = None,
) -> Self:
"""
Add a k-nearest neighbor (kNN) search.
:arg field: the vector field to search against as a string or document class attribute
:arg k: number of nearest neighbors to return as top hits
:arg num_candidates: number of nearest neighbor candidates to consider per shard
:arg query_vector: the vector to search for
:arg query_vector_builder: A dictionary indicating how to build a query vector
:arg boost: A floating-point boost factor for kNN scores
:arg filter: query to filter the documents that can match
:arg similarity: the minimum similarity required for a document to be considered a match, as a float value
:arg inner_hits: retrieve hits from nested field
Example::
s = Search()
s = s.knn(field='embedding', k=5, num_candidates=10, query_vector=vector,
filter=Q('term', category='blog')))
"""
s = self._clone()
s._knn.append(
{
"field": str(field), # str() is for InstrumentedField instances
"k": k,
"num_candidates": num_candidates,
}
)
if query_vector is None and query_vector_builder is None:
raise ValueError("one of query_vector and query_vector_builder is required")
if query_vector is not None and query_vector_builder is not None:
raise ValueError(
"only one of query_vector and query_vector_builder must be given"
)
if query_vector is not None:
s._knn[-1]["query_vector"] = cast(Any, query_vector)
if query_vector_builder is not None:
s._knn[-1]["query_vector_builder"] = query_vector_builder
if boost is not None:
s._knn[-1]["boost"] = boost
if filter is not None:
if isinstance(filter, Query):
s._knn[-1]["filter"] = filter.to_dict()
else:
s._knn[-1]["filter"] = filter
if similarity is not None:
s._knn[-1]["similarity"] = similarity
if inner_hits is not None:
s._knn[-1]["inner_hits"] = inner_hits
return s
def rank(self, rrf: Optional[Union[bool, Dict[str, Any]]] = None) -> Self:
"""
Defines a method for combining and ranking results sets from a combination
of searches. Requires a minimum of 2 results sets.
:arg rrf: Set to ``True`` or an options dictionary to set the rank method to reciprocal rank fusion (RRF).
Example::
s = Search()
s = s.query('match', content='search text')
s = s.knn(field='embedding', k=5, num_candidates=10, query_vector=vector)
s = s.rank(rrf=True)
Note: This option is in technical preview and may change in the future. The syntax will likely change before GA.
"""
s = self._clone()
s._rank = {}
if rrf is not None and rrf is not False:
s._rank["rrf"] = {} if rrf is True else rrf
return s
def source(
self,
fields: Optional[
Union[
bool,
str,
"InstrumentedField",
List[Union[str, "InstrumentedField"]],
Dict[str, List[Union[str, "InstrumentedField"]]],
]
] = None,
**kwargs: Any,
) -> Self:
"""
Selectively control how the _source field is returned.
:arg fields: field name, wildcard string, list of field names or wildcards,
or dictionary of includes and excludes
:arg kwargs: ``includes`` or ``excludes`` arguments, when ``fields`` is ``None``.
When no arguments are given, the entire document will be returned for
each hit. If ``fields`` is a string or list of strings, the field names or field
wildcards given will be included. If ``fields`` is a dictionary with keys of
'includes' and/or 'excludes' the fields will be either included or excluded
appropriately.
Calling this multiple times with the same named parameter will override the
previous values with the new ones.
Example::
s = Search()
s = s.source(includes=['obj1.*'], excludes=["*.description"])
s = Search()
s = s.source(includes=['obj1.*']).source(excludes=["*.description"])
"""
s = self._clone()
if fields and kwargs:
raise ValueError("You cannot specify fields and kwargs at the same time.")
@overload
def ensure_strings(fields: str) -> str: ...
@overload
def ensure_strings(fields: "InstrumentedField") -> str: ...
@overload
def ensure_strings(
fields: List[Union[str, "InstrumentedField"]],
) -> List[str]: ...
@overload
def ensure_strings(
fields: Dict[str, List[Union[str, "InstrumentedField"]]],
) -> Dict[str, List[str]]: ...
def ensure_strings(
fields: Union[
str,
"InstrumentedField",
List[Union[str, "InstrumentedField"]],
Dict[str, List[Union[str, "InstrumentedField"]]],
],
) -> Union[str, List[str], Dict[str, List[str]]]:
if isinstance(fields, dict):
return {k: ensure_strings(v) for k, v in fields.items()}
elif not isinstance(fields, (str, InstrumentedField)):
# we assume that if `fields` is not a any of [dict, str,
# InstrumentedField] then it is an iterable of strings or
# InstrumentedFields, so we convert them to a plain list of
# strings
return [str(f) for f in fields]
else:
return str(fields)
if fields is not None:
s._source = fields if isinstance(fields, bool) else ensure_strings(fields) # type: ignore[assignment]
return s
if kwargs and not isinstance(s._source, dict):
s._source = {}
if isinstance(s._source, dict):
for key, value in kwargs.items():
if value is None:
try:
del s._source[key]
except KeyError:
pass
else:
s._source[key] = ensure_strings(value)
return s
def sort(
self, *keys: Union[str, "InstrumentedField", Dict[str, Dict[str, str]]]
) -> Self:
"""
Add sorting information to the search request. If called without
arguments it will remove all sort requirements. Otherwise it will
replace them. Acceptable arguments are::
'some.field'
'-some.other.field'
{'different.field': {'any': 'dict'}}
so for example::
s = Search().sort(
'category',
'-title',
{"price" : {"order" : "asc", "mode" : "avg"}}
)
will sort by ``category``, ``title`` (in descending order) and
``price`` in ascending order using the ``avg`` mode.
The API returns a copy of the Search object and can thus be chained.
"""
s = self._clone()
s._sort = []
for k in keys:
if not isinstance(k, dict):
sort_field = str(k)
if sort_field.startswith("-"):
if sort_field[1:] == "_score":
raise IllegalOperation("Sorting by `-_score` is not allowed.")
s._sort.append({sort_field[1:]: {"order": "desc"}})
else:
s._sort.append(sort_field)
else:
s._sort.append(k)
return s
def collapse(
self,
field: Optional[Union[str, "InstrumentedField"]] = None,
inner_hits: Optional[Dict[str, Any]] = None,
max_concurrent_group_searches: Optional[int] = None,
) -> Self:
"""
Add collapsing information to the search request.
If called without providing ``field``, it will remove all collapse
requirements, otherwise it will replace them with the provided
arguments.
The API returns a copy of the Search object and can thus be chained.
"""
s = self._clone()
s._collapse = {}
if field is None:
return s
s._collapse["field"] = str(field)
if inner_hits:
s._collapse["inner_hits"] = inner_hits
if max_concurrent_group_searches:
s._collapse["max_concurrent_group_searches"] = max_concurrent_group_searches
return s
def highlight_options(self, **kwargs: Any) -> Self:
"""
Update the global highlighting options used for this request. For
example::
s = Search()
s = s.highlight_options(order='score')
"""
s = self._clone()
s._highlight_opts.update(kwargs)
return s
def highlight(
self, *fields: Union[str, "InstrumentedField"], **kwargs: Any
) -> Self:
"""
Request highlighting of some fields. All keyword arguments passed in will be
used as parameters for all the fields in the ``fields`` parameter. Example::
Search().highlight('title', 'body', fragment_size=50)
will produce the equivalent of::
{
"highlight": {
"fields": {
"body": {"fragment_size": 50},
"title": {"fragment_size": 50}
}
}
}
If you want to have different options for different fields
you can call ``highlight`` twice::
Search().highlight('title', fragment_size=50).highlight('body', fragment_size=100)
which will produce::
{
"highlight": {
"fields": {
"body": {"fragment_size": 100},
"title": {"fragment_size": 50}
}
}
}
"""
s = self._clone()
for f in fields:
s._highlight[str(f)] = kwargs
return s
def suggest(
self,
name: str,
text: Optional[str] = None,
regex: Optional[str] = None,
**kwargs: Any,
) -> Self:
"""
Add a suggestions request to the search.
:arg name: name of the suggestion
:arg text: text to suggest on
All keyword arguments will be added to the suggestions body. For example::
s = Search()
s = s.suggest('suggestion-1', 'Elasticsearch', term={'field': 'body'})
# regex query for Completion Suggester
s = Search()
s = s.suggest('suggestion-1', regex='py[thon|py]', completion={'field': 'body'})
"""
if text is None and regex is None:
raise ValueError('You have to pass "text" or "regex" argument.')
if text and regex:
raise ValueError('You can only pass either "text" or "regex" argument.')
if regex and "completion" not in kwargs:
raise ValueError(
'"regex" argument must be passed with "completion" keyword argument.'
)
s = self._clone()
if regex:
s._suggest[name] = {"regex": regex}
elif text:
if "completion" in kwargs:
s._suggest[name] = {"prefix": text}
else:
s._suggest[name] = {"text": text}
s._suggest[name].update(kwargs)
return s
def search_after(self) -> Self:
"""
Return a ``Search`` instance that retrieves the next page of results.
This method provides an easy way to paginate a long list of results using
the ``search_after`` option. For example::
page_size = 20
s = Search()[:page_size].sort("date")
while True:
# get a page of results
r = await s.execute()
# do something with this page of results
# exit the loop if we reached the end
if len(r.hits) < page_size:
break
# get a search object with the next page of results
s = s.search_after()
Note that the ``search_after`` option requires the search to have an
explicit ``sort`` order.
"""
if not hasattr(self, "_response"):
raise ValueError("A search must be executed before using search_after")
return cast(Self, self._response.search_after())
def to_dict(self, count: bool = False, **kwargs: Any) -> Dict[str, Any]:
"""
Serialize the search into the dictionary that will be sent over as the
request's body.
:arg count: a flag to specify if we are interested in a body for count -
no aggregations, no pagination bounds etc.
All additional keyword arguments will be included into the dictionary.
"""
d = {}
if self.query:
d["query"] = recursive_to_dict(self.query)
if self._knn:
if len(self._knn) == 1:
d["knn"] = self._knn[0]
else:
d["knn"] = self._knn
if self._rank:
d["rank"] = self._rank
# count request doesn't care for sorting and other things
if not count:
if self.post_filter:
d["post_filter"] = recursive_to_dict(self.post_filter.to_dict())
if self.aggs.aggs:
d.update(recursive_to_dict(self.aggs.to_dict()))
if self._sort:
d["sort"] = self._sort
if self._collapse:
d["collapse"] = self._collapse
d.update(recursive_to_dict(self._extra))
if self._source not in (None, {}):
d["_source"] = self._source
if self._highlight:
d["highlight"] = {"fields": self._highlight}
d["highlight"].update(self._highlight_opts)
if self._suggest:
d["suggest"] = self._suggest
if self._script_fields:
d["script_fields"] = self._script_fields
d.update(recursive_to_dict(kwargs))
return d
class MultiSearchBase(Request[_R]):
"""
Combine multiple :class:`~elasticsearch.dsl.Search` objects into a single
request.
"""
def __init__(self, **kwargs: Any):
super().__init__(**kwargs)
self._searches: List[SearchBase[_R]] = []
def __getitem__(self, key: Union[int, slice]) -> Any:
return self._searches[key]
def __iter__(self) -> Iterator[SearchBase[_R]]:
return iter(self._searches)
def _clone(self) -> Self:
ms = super()._clone()
ms._searches = self._searches[:]
return ms
def add(self, search: SearchBase[_R]) -> Self:
"""
Adds a new :class:`~elasticsearch.dsl.Search` object to the request::
ms = MultiSearch(index='my-index')
ms = ms.add(Search(doc_type=Category).filter('term', category='python'))
ms = ms.add(Search(doc_type=Blog))
"""
ms = self._clone()
ms._searches.append(search)
return ms
def to_dict(self) -> List[Dict[str, Any]]: # type: ignore[override]
out: List[Dict[str, Any]] = []
for s in self._searches:
meta: Dict[str, Any] = {}
if s._index:
meta["index"] = cast(Any, s._index)
meta.update(s._params)
out.append(meta)
out.append(s.to_dict())
return out