elasticsearch/serializer.py (155 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import uuid
from datetime import date, datetime
from decimal import Decimal
from typing import Any, ClassVar, Dict, Tuple
from elastic_transport import JsonSerializer as _JsonSerializer
from elastic_transport import NdjsonSerializer as _NdjsonSerializer
from elastic_transport import Serializer as Serializer
from elastic_transport import TextSerializer as TextSerializer
from .exceptions import SerializationError
INTEGER_TYPES = ()
FLOAT_TYPES = (Decimal,)
TIME_TYPES = (date, datetime)
__all__ = [
"Serializer",
"JsonSerializer",
"TextSerializer",
"NdjsonSerializer",
"CompatibilityModeJsonSerializer",
"CompatibilityModeNdjsonSerializer",
"MapboxVectorTileSerializer",
]
try:
from elastic_transport import OrjsonSerializer as _OrjsonSerializer
__all__.append("OrjsonSerializer")
except ImportError:
_OrjsonSerializer = None # type: ignore[assignment,misc]
try:
import pyarrow as pa
__all__.append("PyArrowSerializer")
except ImportError:
pa = None
class JsonSerializer(_JsonSerializer):
mimetype: ClassVar[str] = "application/json"
def default(self, data: Any) -> Any:
if isinstance(data, TIME_TYPES):
# Little hack to avoid importing pandas but to not
# return 'NaT' string for pd.NaT as that's not a valid
# Elasticsearch date.
formatted_data = data.isoformat()
if formatted_data != "NaT":
return formatted_data
if isinstance(data, uuid.UUID):
return str(data)
elif isinstance(data, FLOAT_TYPES):
return float(data)
# This is kept for backwards compatibility even
# if 'INTEGER_TYPES' isn't used by default anymore.
elif INTEGER_TYPES and isinstance(data, INTEGER_TYPES):
return int(data)
# Special cases for numpy and pandas types
# These are expensive to import so we try them last.
serialized, value = _attempt_serialize_numpy_or_pandas(data)
if serialized:
return value
raise TypeError(f"Unable to serialize {data!r} (type: {type(data)})")
if _OrjsonSerializer is not None:
class OrjsonSerializer(JsonSerializer, _OrjsonSerializer):
def default(self, data: Any) -> Any:
return JsonSerializer.default(self, data)
class NdjsonSerializer(JsonSerializer, _NdjsonSerializer):
mimetype: ClassVar[str] = "application/x-ndjson"
def default(self, data: Any) -> Any:
return JsonSerializer.default(self, data)
class CompatibilityModeJsonSerializer(JsonSerializer):
mimetype: ClassVar[str] = "application/vnd.elasticsearch+json"
class CompatibilityModeNdjsonSerializer(NdjsonSerializer):
mimetype: ClassVar[str] = "application/vnd.elasticsearch+x-ndjson"
class MapboxVectorTileSerializer(Serializer):
mimetype: ClassVar[str] = "application/vnd.mapbox-vector-tile"
def loads(self, data: bytes) -> bytes:
return data
def dumps(self, data: bytes) -> bytes:
if isinstance(data, bytes):
return data
raise SerializationError(f"Cannot serialize {data!r} into a MapBox vector tile")
if pa is not None:
class PyArrowSerializer(Serializer):
"""PyArrow serializer for deserializing Arrow Stream data."""
mimetype: ClassVar[str] = "application/vnd.apache.arrow.stream"
def loads(self, data: bytes) -> pa.Table:
try:
with pa.ipc.open_stream(data) as reader:
return reader.read_all()
except pa.ArrowException as e:
raise SerializationError(
message=f"Unable to deserialize as Arrow stream: {data!r}",
errors=(e,),
)
def dumps(self, data: Any) -> bytes:
raise SerializationError(
message="Elasticsearch does not accept Arrow input data"
)
DEFAULT_SERIALIZERS: Dict[str, Serializer] = {
JsonSerializer.mimetype: JsonSerializer(),
MapboxVectorTileSerializer.mimetype: MapboxVectorTileSerializer(),
NdjsonSerializer.mimetype: NdjsonSerializer(),
CompatibilityModeJsonSerializer.mimetype: CompatibilityModeJsonSerializer(),
CompatibilityModeNdjsonSerializer.mimetype: CompatibilityModeNdjsonSerializer(),
}
if pa is not None:
DEFAULT_SERIALIZERS[PyArrowSerializer.mimetype] = PyArrowSerializer()
# Alias for backwards compatibility
JSONSerializer = JsonSerializer
def _attempt_serialize_numpy_or_pandas(data: Any) -> Tuple[bool, Any]:
"""Attempts to serialize a value from the numpy or pandas libraries.
This function is separate from JSONSerializer because the inner functions
are rewritten to be no-ops if either library isn't available to avoid
attempting to import and raising an ImportError over and over again.
Returns a tuple of (bool, Any) where the bool corresponds to whether
the second value contains a properly serialized value and thus
should be returned by JSONSerializer.default().
"""
serialized, value = _attempt_serialize_numpy(data)
if serialized:
return serialized, value
serialized, value = _attempt_serialize_pandas(data)
if serialized:
return serialized, value
return False, None
def _attempt_serialize_numpy(data: Any) -> Tuple[bool, Any]:
global _attempt_serialize_numpy
try:
import numpy as np
if isinstance(
data,
(
np.int_,
np.intc,
np.int8,
np.int16,
np.int32,
np.int64,
np.uint8,
np.uint16,
np.uint32,
np.uint64,
),
):
return True, int(data)
elif isinstance(
data,
(
np.float16,
np.float32,
np.float64,
),
):
return True, float(data)
elif isinstance(data, np.bool_):
return True, bool(data)
elif isinstance(data, np.datetime64):
return True, data.item().isoformat()
elif isinstance(data, np.ndarray):
return True, data.tolist()
except ImportError:
# Since we failed to import 'numpy' we don't want to try again.
_attempt_serialize_numpy = _attempt_serialize_noop
return False, None
def _attempt_serialize_pandas(data: Any) -> Tuple[bool, Any]:
global _attempt_serialize_pandas
try:
import pandas as pd
if isinstance(data, (pd.Series, pd.Categorical)):
return True, data.tolist()
elif isinstance(data, pd.Timestamp) and data is not getattr(pd, "NaT", None):
return True, data.isoformat()
elif data is getattr(pd, "NA", None):
return True, None
except ImportError:
# Since we failed to import 'pandas' we don't want to try again.
_attempt_serialize_pandas = _attempt_serialize_noop
return False, None
def _attempt_serialize_noop(data: Any) -> Tuple[bool, Any]: # noqa
# Short-circuit if the above functions can't import
# the corresponding library on the first attempt.
return False, None