utils/templates/response.__init__.py.tpl (180 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from typing import ( TYPE_CHECKING, Any, Dict, Generic, Iterator, List, Mapping, Optional, Sequence, Tuple, Union, cast, ) from ..utils import _R, AttrDict, AttrList, _wrap from .hit import Hit, HitMeta if TYPE_CHECKING: from ..aggs import Agg from ..faceted_search_base import FacetedSearchBase from ..search_base import Request, SearchBase from ..update_by_query_base import UpdateByQueryBase from .. import types __all__ = ["Response", "AggResponse", "UpdateByQueryResponse", "Hit", "HitMeta", "AggregateResponseType"] class Response(AttrDict[Any], Generic[_R]): """An Elasticsearch search response. {% for arg in response.args %} {% for line in arg.doc %} {{ line }} {% endfor %} {% endfor %} """ _search: "SearchBase[_R]" _faceted_search: "FacetedSearchBase[_R]" _doc_class: Optional[_R] _hits: List[_R] {% for arg in response.args %} {% if arg.name not in ["hits", "aggregations"] %} {{ arg.name }}: {{ arg.type }} {% endif %} {% endfor %} def __init__( self, search: "Request[_R]", response: Dict[str, Any], doc_class: Optional[_R] = None, ): super(AttrDict, self).__setattr__("_search", search) super(AttrDict, self).__setattr__("_doc_class", doc_class) super().__init__(response) def __iter__(self) -> Iterator[_R]: # type: ignore[override] return iter(self.hits) def __getitem__(self, key: Union[slice, int, str]) -> Any: if isinstance(key, (slice, int)): # for slicing etc return self.hits[key] return super().__getitem__(key) def __nonzero__(self) -> bool: return bool(self.hits) __bool__ = __nonzero__ def __repr__(self) -> str: return "<Response: %r>" % (self.hits or self.aggregations) def __len__(self) -> int: return len(self.hits) def __getstate__(self) -> Tuple[Dict[str, Any], "Request[_R]", Optional[_R]]: # type: ignore[override] return self._d_, self._search, self._doc_class def __setstate__( self, state: Tuple[Dict[str, Any], "Request[_R]", Optional[_R]] # type: ignore[override] ) -> None: super(AttrDict, self).__setattr__("_d_", state[0]) super(AttrDict, self).__setattr__("_search", state[1]) super(AttrDict, self).__setattr__("_doc_class", state[2]) def success(self) -> bool: return self._shards.total == self._shards.successful and not self.timed_out @property def hits(self) -> List[_R]: if not hasattr(self, "_hits"): h = cast(AttrDict[Any], self._d_["hits"]) try: hits = AttrList(list(map(self._search._get_result, h["hits"]))) except AttributeError as e: # avoid raising AttributeError since it will be hidden by the property raise TypeError("Could not parse hits.", e) # avoid assigning _hits into self._d_ super(AttrDict, self).__setattr__("_hits", hits) for k in h: setattr(self._hits, k, _wrap(h[k])) return self._hits @property def aggregations(self) -> "AggResponse[_R]": return self.aggs @property def aggs(self) -> "AggResponse[_R]": if not hasattr(self, "_aggs"): aggs = AggResponse[_R]( cast("Agg[_R]", self._search.aggs), self._search, cast(Dict[str, Any], self._d_.get("aggregations", {})), ) # avoid assigning _aggs into self._d_ super(AttrDict, self).__setattr__("_aggs", aggs) return cast("AggResponse[_R]", self._aggs) def search_after(self) -> "SearchBase[_R]": """ Return a ``Search`` instance that retrieves the next page of results. This method provides an easy way to paginate a long list of results using the ``search_after`` option. For example:: page_size = 20 s = Search()[:page_size].sort("date") while True: # get a page of results r = await s.execute() # do something with this page of results # exit the loop if we reached the end if len(r.hits) < page_size: break # get a search object with the next page of results s = r.search_after() Note that the ``search_after`` option requires the search to have an explicit ``sort`` order. """ if len(self.hits) == 0: raise ValueError("Cannot use search_after when there are no search results") if not hasattr(self.hits[-1].meta, "sort"): # type: ignore raise ValueError("Cannot use search_after when results are not sorted") return self._search.extra(search_after=self.hits[-1].meta.sort) # type: ignore AggregateResponseType = {{ response["aggregate_type"] }} class AggResponse(AttrDict[Any], Generic[_R]): """An Elasticsearch aggregation response.""" _meta: Dict[str, Any] def __init__(self, aggs: "Agg[_R]", search: "Request[_R]", data: Dict[str, Any]): super(AttrDict, self).__setattr__("_meta", {"search": search, "aggs": aggs}) super().__init__(data) def __getitem__(self, attr_name: str) -> AggregateResponseType: if attr_name in self._meta["aggs"]: # don't do self._meta['aggs'][attr_name] to avoid copying agg = self._meta["aggs"].aggs[attr_name] return cast(AggregateResponseType, agg.result(self._meta["search"], self._d_[attr_name])) return super().__getitem__(attr_name) # type: ignore def __iter__(self) -> Iterator[AggregateResponseType]: # type: ignore[override] for name in self._meta["aggs"]: yield self[name] class UpdateByQueryResponse(AttrDict[Any], Generic[_R]): """An Elasticsearch update by query response. {% for arg in ubq_response.args %} {% for line in arg.doc %} {{ line }} {% endfor %} {% endfor %} """ _search: "UpdateByQueryBase[_R]" {% for arg in ubq_response.args %} {{ arg.name }}: {{ arg.type }} {% endfor %} def __init__( self, search: "Request[_R]", response: Dict[str, Any], doc_class: Optional[_R] = None, ): super(AttrDict, self).__setattr__("_search", search) super(AttrDict, self).__setattr__("_doc_class", doc_class) super().__init__(response) def success(self) -> bool: return not self.timed_out and not self.failures