elasticsearch/dsl/wrappers.py (100 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import operator
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Dict,
Literal,
Mapping,
Optional,
Tuple,
TypeVar,
Union,
cast,
)
if TYPE_CHECKING:
from _operator import _SupportsComparison
from typing_extensions import TypeAlias
from .utils import AttrDict
ComparisonOperators: TypeAlias = Literal["lt", "lte", "gt", "gte"]
RangeValT = TypeVar("RangeValT", bound="_SupportsComparison")
__all__ = ["Range"]
class Range(AttrDict[RangeValT]):
OPS: ClassVar[
Mapping[
ComparisonOperators,
Callable[["_SupportsComparison", "_SupportsComparison"], bool],
]
] = {
"lt": operator.lt,
"lte": operator.le,
"gt": operator.gt,
"gte": operator.ge,
}
def __init__(
self,
d: Optional[Dict[str, RangeValT]] = None,
/,
**kwargs: RangeValT,
):
if d is not None and (kwargs or not isinstance(d, dict)):
raise ValueError(
"Range accepts a single dictionary or a set of keyword arguments."
)
if d is None:
data = kwargs
else:
data = d
for k in data:
if k not in self.OPS:
raise ValueError(f"Range received an unknown operator {k!r}")
if "gt" in data and "gte" in data:
raise ValueError("You cannot specify both gt and gte for Range.")
if "lt" in data and "lte" in data:
raise ValueError("You cannot specify both lt and lte for Range.")
super().__init__(data)
def __repr__(self) -> str:
return "Range(%s)" % ", ".join("%s=%r" % op for op in self._d_.items())
def __contains__(self, item: object) -> bool:
if isinstance(item, str):
return super().__contains__(item)
item_supports_comp = any(hasattr(item, f"__{op}__") for op in self.OPS)
if not item_supports_comp:
return False
for op in self.OPS:
if op in self._d_ and not self.OPS[op](
cast("_SupportsComparison", item), self._d_[op]
):
return False
return True
@property
def upper(self) -> Union[Tuple[RangeValT, bool], Tuple[None, Literal[False]]]:
if "lt" in self._d_:
return self._d_["lt"], False
if "lte" in self._d_:
return self._d_["lte"], True
return None, False
@property
def lower(self) -> Union[Tuple[RangeValT, bool], Tuple[None, Literal[False]]]:
if "gt" in self._d_:
return self._d_["gt"], False
if "gte" in self._d_:
return self._d_["gte"], True
return None, False
class AggregationRange(AttrDict[Any]):
"""
:arg from: Start of the range (inclusive).
:arg key: Custom key to return the range with.
:arg to: End of the range (exclusive).
"""
def __init__(
self,
*,
from_: Any = None,
key: Optional[str] = None,
to: Any = None,
**kwargs: Any,
):
if from_ is not None:
kwargs["from_"] = from_
if key is not None:
kwargs["key"] = key
if to is not None:
kwargs["to"] = to
super().__init__(kwargs)