elasticsearch/dsl/_sync/search.py (119 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterator,
List,
Optional,
cast,
)
from typing_extensions import Self
from elasticsearch.exceptions import ApiError
from elasticsearch.helpers import scan
from ..connections import get_connection
from ..response import Response
from ..search_base import MultiSearchBase, SearchBase
from ..utils import _R, AttrDict, UsingType
class Search(SearchBase[_R]):
_using: UsingType
def __iter__(self) -> Iterator[_R]:
"""
Iterate over the hits.
"""
class ResultsIterator(Iterator[_R]):
def __init__(self, search: Search[_R]):
self.search = search
self.iterator: Optional[Iterator[_R]] = None
def __next__(self) -> _R:
if self.iterator is None:
self.iterator = iter(self.search.execute())
try:
return next(self.iterator)
except StopIteration:
raise StopIteration()
return ResultsIterator(self)
def count(self) -> int:
"""
Return the number of hits matching the query and filters. Note that
only the actual number is returned.
"""
if hasattr(self, "_response") and self._response.hits.total.relation == "eq": # type: ignore[attr-defined]
return cast(int, self._response.hits.total.value) # type: ignore[attr-defined]
es = get_connection(self._using)
d = self.to_dict(count=True)
# TODO: failed shards detection
resp = es.count(
index=self._index,
query=cast(Optional[Dict[str, Any]], d.get("query", None)),
**self._params,
)
return cast(int, resp["count"])
def execute(self, ignore_cache: bool = False) -> Response[_R]:
"""
Execute the search and return an instance of ``Response`` wrapping all
the data.
:arg ignore_cache: if set to ``True``, consecutive calls will hit
ES, while cached result will be ignored. Defaults to `False`
"""
if ignore_cache or not hasattr(self, "_response"):
es = get_connection(self._using)
self._response = self._response_class(
self,
(
es.search(index=self._index, body=self.to_dict(), **self._params)
).body,
)
return self._response
def scan(self) -> Iterator[_R]:
"""
Turn the search into a scan search and return a generator that will
iterate over all the documents matching the query.
Use the ``params`` method to specify any additional arguments you wish to
pass to the underlying ``scan`` helper from ``elasticsearch-py`` -
https://elasticsearch-py.readthedocs.io/en/latest/helpers.html#scan
The ``iterate()`` method should be preferred, as it provides similar
functionality using an Elasticsearch point in time.
"""
es = get_connection(self._using)
for hit in scan(es, query=self.to_dict(), index=self._index, **self._params):
yield self._get_result(cast(AttrDict[Any], hit))
def delete(self) -> AttrDict[Any]:
"""
``delete()`` executes the query by delegating to ``delete_by_query()``.
Use the ``params`` method to specify any additional arguments you wish to
pass to the underlying ``delete_by_query`` helper from ``elasticsearch-py`` -
https://elasticsearch-py.readthedocs.io/en/latest/api/elasticsearch.html#elasticsearch.Elasticsearch.delete_by_query
"""
es = get_connection(self._using)
assert self._index is not None
return AttrDict(
cast(
Dict[str, Any],
es.delete_by_query(
index=self._index, body=self.to_dict(), **self._params
),
)
)
@contextlib.contextmanager
def point_in_time(self, keep_alive: str = "1m") -> Iterator[Self]:
"""
Open a point in time (pit) that can be used across several searches.
This method implements a context manager that returns a search object
configured to operate within the created pit.
:arg keep_alive: the time to live for the point in time, renewed with each search request
"""
es = get_connection(self._using)
pit = es.open_point_in_time(index=self._index or "*", keep_alive=keep_alive)
search = self.index().extra(pit={"id": pit["id"], "keep_alive": keep_alive})
if not search._sort:
search = search.sort("_shard_doc")
yield search
es.close_point_in_time(id=pit["id"])
def iterate(self, keep_alive: str = "1m") -> Iterator[_R]:
"""
Return a generator that iterates over all the documents matching the query.
This method uses a point in time to provide consistent results even when
the index is changing. It should be preferred over ``scan()``.
:arg keep_alive: the time to live for the point in time, renewed with each new search request
"""
with self.point_in_time(keep_alive=keep_alive) as s:
while True:
r = s.execute()
for hit in r:
yield hit
if len(r.hits) == 0:
break
s = s.search_after()
class MultiSearch(MultiSearchBase[_R]):
"""
Combine multiple :class:`~elasticsearch.dsl.Search` objects into a single
request.
"""
_using: UsingType
if TYPE_CHECKING:
def add(self, search: Search[_R]) -> Self: ... # type: ignore[override]
def execute(
self, ignore_cache: bool = False, raise_on_error: bool = True
) -> List[Response[_R]]:
"""
Execute the multi search request and return a list of search results.
"""
if ignore_cache or not hasattr(self, "_response"):
es = get_connection(self._using)
responses = es.msearch(
index=self._index, body=self.to_dict(), **self._params
)
out: List[Response[_R]] = []
for s, r in zip(self._searches, responses["responses"]):
if r.get("error", False):
if raise_on_error:
raise ApiError("N/A", meta=responses.meta, body=r)
r = None
else:
r = Response(s, r)
out.append(r)
self._response = out
return self._response
class EmptySearch(Search[_R]):
def count(self) -> int:
return 0
def execute(self, ignore_cache: bool = False) -> Response[_R]:
return self._response_class(self, {"hits": {"total": 0, "hits": []}})
def scan(self) -> Iterator[_R]:
return
yield # a bit strange, but this forces an empty generator function
def delete(self) -> AttrDict[Any]:
return AttrDict[Any]({})