elasticsearch/dsl/document_base.py (243 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import date, datetime
from fnmatch import fnmatch
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Dict,
Generic,
List,
Optional,
Tuple,
TypeVar,
Union,
get_args,
overload,
)
try:
from types import UnionType
except ImportError:
UnionType = None # type: ignore[assignment, misc]
from typing_extensions import dataclass_transform
from .exceptions import ValidationException
from .field import Binary, Boolean, Date, Field, Float, Integer, Nested, Object, Text
from .mapping import Mapping
from .utils import DOC_META_FIELDS, ObjectBase
if TYPE_CHECKING:
from elastic_transport import ObjectApiResponse
from .index_base import IndexBase
class MetaField:
def __init__(self, *args: Any, **kwargs: Any):
self.args, self.kwargs = args, kwargs
class InstrumentedField:
"""Proxy object for a mapped document field.
An object of this instance is returned when a field is accessed as a class
attribute of a ``Document`` or ``InnerDoc`` subclass. These objects can
be used in any situation in which a reference to a field is required, such
as when specifying sort options in a search::
class MyDocument(Document):
name: str
s = MyDocument.search()
s = s.sort(-MyDocument.name) # sort by name in descending order
"""
def __init__(self, name: str, field: Field):
self._name = name
self._field = field
# note that the return value type here assumes classes will only be used to
# access fields (I haven't found a way to make this type dynamic based on a
# decision taken at runtime)
def __getattr__(self, attr: str) -> "InstrumentedField":
try:
# first let's see if this is an attribute of this object
return super().__getattribute__(attr) # type: ignore[no-any-return]
except AttributeError:
try:
# next we see if we have a sub-field with this name
return InstrumentedField(f"{self._name}.{attr}", self._field[attr])
except KeyError:
# lastly we let the wrapped field resolve this attribute
return getattr(self._field, attr) # type: ignore[no-any-return]
def __pos__(self) -> str:
"""Return the field name representation for ascending sort order"""
return f"{self._name}"
def __neg__(self) -> str:
"""Return the field name representation for descending sort order"""
return f"-{self._name}"
def __str__(self) -> str:
return self._name
def __repr__(self) -> str:
return f"InstrumentedField[{self._name}]"
class DocumentMeta(type):
_doc_type: "DocumentOptions"
_index: "IndexBase"
def __new__(
cls, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]
) -> "DocumentMeta":
# DocumentMeta filters attrs in place
attrs["_doc_type"] = DocumentOptions(name, bases, attrs)
return super().__new__(cls, name, bases, attrs)
def __getattr__(cls, attr: str) -> Any:
if attr in cls._doc_type.mapping:
return InstrumentedField(attr, cls._doc_type.mapping[attr])
return super().__getattribute__(attr)
class DocumentOptions:
type_annotation_map = {
int: (Integer, {}),
float: (Float, {}),
bool: (Boolean, {}),
str: (Text, {}),
bytes: (Binary, {}),
datetime: (Date, {}),
date: (Date, {"format": "yyyy-MM-dd"}),
}
def __init__(self, name: str, bases: Tuple[type, ...], attrs: Dict[str, Any]):
meta = attrs.pop("Meta", None)
# create the mapping instance
self.mapping: Mapping = getattr(meta, "mapping", Mapping())
# register the document's fields, which can be given in a few formats:
#
# class MyDocument(Document):
# # required field using native typing
# # (str, int, float, bool, datetime, date)
# field1: str
#
# # optional field using native typing
# field2: Optional[datetime]
#
# # array field using native typing
# field3: list[int]
#
# # sub-object, same as Object(MyInnerDoc)
# field4: MyInnerDoc
#
# # nested sub-objects, same as Nested(MyInnerDoc)
# field5: list[MyInnerDoc]
#
# # use typing, but override with any stock or custom field
# field6: bool = MyCustomField()
#
# # best mypy and pyright support and dataclass-like behavior
# field7: M[date]
# field8: M[str] = mapped_field(MyCustomText(), default="foo")
#
# # legacy format without Python typing
# field9 = Text()
#
# # ignore attributes
# field10: ClassVar[string] = "a regular class variable"
annotations = attrs.get("__annotations__", {})
fields = set([n for n in attrs if isinstance(attrs[n], Field)])
fields.update(annotations.keys())
field_defaults = {}
for name in fields:
value: Any = None
required = None
multi = None
if name in annotations:
# the field has a type annotation, so next we try to figure out
# what field type we can use
type_ = annotations[name]
skip = False
required = True
multi = False
while hasattr(type_, "__origin__"):
if type_.__origin__ == ClassVar:
skip = True
break
elif type_.__origin__ == Mapped:
# M[type] -> extract the wrapped type
type_ = type_.__args__[0]
elif type_.__origin__ == Union:
if len(type_.__args__) == 2 and type_.__args__[1] is type(None):
# Optional[type] -> mark instance as optional
required = False
type_ = type_.__args__[0]
else:
raise TypeError("Unsupported union")
elif type_.__origin__ in [list, List]:
# List[type] -> mark instance as multi
multi = True
required = False
type_ = type_.__args__[0]
else:
break
if skip or type_ == ClassVar:
# skip ClassVar attributes
continue
if type(type_) is UnionType:
# a union given with the pipe syntax
args = get_args(type_)
if len(args) == 2 and args[1] is type(None):
required = False
type_ = type_.__args__[0]
else:
raise TypeError("Unsupported union")
field = None
field_args: List[Any] = []
field_kwargs: Dict[str, Any] = {}
if isinstance(type_, type) and issubclass(type_, InnerDoc):
# object or nested field
field = Nested if multi else Object
field_args = [type_]
elif type_ in self.type_annotation_map:
# use best field type for the type hint provided
field, field_kwargs = self.type_annotation_map[type_] # type: ignore[assignment]
if field:
field_kwargs = {
"multi": multi,
"required": required,
**field_kwargs,
}
value = field(*field_args, **field_kwargs)
if name in attrs:
# this field has a right-side value, which can be field
# instance on its own or wrapped with mapped_field()
attr_value = attrs[name]
if isinstance(attr_value, dict):
# the mapped_field() wrapper function was used so we need
# to look for the field instance and also record any
# dataclass-style defaults
attr_value = attrs[name].get("_field")
default_value = attrs[name].get("default") or attrs[name].get(
"default_factory"
)
if default_value:
field_defaults[name] = default_value
if attr_value:
value = attr_value
if required is not None:
value._required = required
if multi is not None:
value._multi = multi
if value is None:
raise TypeError(f"Cannot map field {name}")
self.mapping.field(name, value)
if name in attrs:
del attrs[name]
# store dataclass-style defaults for ObjectBase.__init__ to assign
attrs["_defaults"] = field_defaults
# add all the mappings for meta fields
for name in dir(meta):
if isinstance(getattr(meta, name, None), MetaField):
params = getattr(meta, name)
self.mapping.meta(name, *params.args, **params.kwargs)
# document inheritance - include the fields from parents' mappings
for b in bases:
if hasattr(b, "_doc_type") and hasattr(b._doc_type, "mapping"):
self.mapping.update(b._doc_type.mapping, update_only=True)
@property
def name(self) -> str:
return self.mapping.properties.name
_FieldType = TypeVar("_FieldType")
class Mapped(Generic[_FieldType]):
"""Class that represents the type of a mapped field.
This class can be used as an optional wrapper on a field type to help type
checkers assign the correct type when the field is used as a class
attribute.
Consider the following definitions::
class MyDocument(Document):
first: str
second: M[str]
mydoc = MyDocument(first="1", second="2")
Type checkers have no trouble inferring the type of both ``mydoc.first``
and ``mydoc.second`` as ``str``, but while ``MyDocument.first`` will be
incorrectly typed as ``str``, ``MyDocument.second`` should be assigned the
correct ``InstrumentedField`` type.
"""
__slots__: Dict[str, Any] = {}
if TYPE_CHECKING:
@overload
def __get__(self, instance: None, owner: Any) -> InstrumentedField: ...
@overload
def __get__(self, instance: object, owner: Any) -> _FieldType: ...
def __get__(
self, instance: Optional[object], owner: Any
) -> Union[InstrumentedField, _FieldType]: ...
def __set__(self, instance: Optional[object], value: _FieldType) -> None: ...
def __delete__(self, instance: Any) -> None: ...
M = Mapped
def mapped_field(
field: Optional[Field] = None,
*,
init: bool = True,
default: Any = None,
default_factory: Optional[Callable[[], Any]] = None,
**kwargs: Any,
) -> Any:
"""Construct a field using dataclass behaviors
This function can be used in the right side of a document field definition
as a wrapper for the field instance or as a way to provide dataclass-compatible
options.
:param field: The instance of ``Field`` to use for this field. If not provided,
an instance that is appropriate for the type given to the field is used.
:param init: a value of ``True`` adds this field to the constructor, and a
value of ``False`` omits it from it. The default is ``True``.
:param default: a default value to use for this field when one is not provided
explicitly.
:param default_factory: a callable that returns a default value for the field,
when one isn't provided explicitly. Only one of ``factory`` and
``default_factory`` can be used.
"""
return {
"_field": field,
"init": init,
"default": default,
"default_factory": default_factory,
**kwargs,
}
@dataclass_transform(field_specifiers=(mapped_field,))
class InnerDoc(ObjectBase, metaclass=DocumentMeta):
"""
Common class for inner documents like Object or Nested
"""
@classmethod
def from_es(
cls,
data: Union[Dict[str, Any], "ObjectApiResponse[Any]"],
data_only: bool = False,
) -> "InnerDoc":
if data_only:
data = {"_source": data}
return super().from_es(data)
class DocumentBase(ObjectBase):
"""
Model-like class for persisting documents in elasticsearch.
"""
@classmethod
def _matches(cls, hit: Dict[str, Any]) -> bool:
if cls._index._name is None:
return True
return fnmatch(hit.get("_index", ""), cls._index._name)
@classmethod
def _default_index(cls, index: Optional[str] = None) -> str:
return index or cls._index._name
def _get_index(
self, index: Optional[str] = None, required: bool = True
) -> Optional[str]:
if index is None:
index = getattr(self.meta, "index", None)
if index is None:
index = getattr(self._index, "_name", None)
if index is None and required:
raise ValidationException("No index")
if index and "*" in index:
raise ValidationException("You cannot write to a wildcard index.")
return index
def __repr__(self) -> str:
return "{}({})".format(
self.__class__.__name__,
", ".join(
f"{key}={getattr(self.meta, key)!r}"
for key in ("index", "id")
if key in self.meta
),
)
def to_dict(self, include_meta: bool = False, skip_empty: bool = True) -> Dict[str, Any]: # type: ignore[override]
"""
Serialize the instance into a dictionary so that it can be saved in elasticsearch.
:arg include_meta: if set to ``True`` will include all the metadata
(``_index``, ``_id`` etc). Otherwise just the document's
data is serialized. This is useful when passing multiple instances into
``elasticsearch.helpers.bulk``.
:arg skip_empty: if set to ``False`` will cause empty values (``None``,
``[]``, ``{}``) to be left on the document. Those values will be
stripped out otherwise as they make no difference in elasticsearch.
"""
d = super().to_dict(skip_empty=skip_empty)
if not include_meta:
return d
meta = {"_" + k: self.meta[k] for k in DOC_META_FIELDS if k in self.meta}
# in case of to_dict include the index unlike save/update/delete
index = self._get_index(required=False)
if index is not None:
meta["_index"] = index
meta["_source"] = d
return meta