elastic_transport/_serializer.py (159 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import re
import uuid
from datetime import date
from decimal import Decimal
from typing import Any, ClassVar, Mapping, Optional
from ._exceptions import SerializationError
try:
import orjson
except ModuleNotFoundError:
orjson = None # type: ignore[assignment]
class Serializer:
"""Serializer interface."""
mimetype: ClassVar[str]
def loads(self, data: bytes) -> Any: # pragma: nocover
raise NotImplementedError()
def dumps(self, data: Any) -> bytes: # pragma: nocover
raise NotImplementedError()
class TextSerializer(Serializer):
"""Text serializer to and from UTF-8."""
mimetype: ClassVar[str] = "text/*"
def loads(self, data: bytes) -> str:
if isinstance(data, str):
return data
try:
return data.decode("utf-8", "surrogatepass")
except UnicodeError as e:
raise SerializationError(
f"Unable to deserialize as text: {data!r}", errors=(e,)
)
def dumps(self, data: str) -> bytes:
# The body is already encoded to bytes
# so we forward the request body along.
if isinstance(data, bytes):
return data
try:
return data.encode("utf-8", "surrogatepass")
except (AttributeError, UnicodeError, TypeError) as e:
raise SerializationError(
f"Unable to serialize to text: {data!r}", errors=(e,)
)
class JsonSerializer(Serializer):
"""JSON serializer relying on the standard library json module."""
mimetype: ClassVar[str] = "application/json"
def default(self, data: Any) -> Any:
if isinstance(data, date):
return data.isoformat()
elif isinstance(data, uuid.UUID):
return str(data)
elif isinstance(data, Decimal):
return float(data)
raise SerializationError(
message=f"Unable to serialize to JSON: {data!r} (type: {type(data).__name__})",
)
def json_dumps(self, data: Any) -> bytes:
return json.dumps(
data, default=self.default, ensure_ascii=False, separators=(",", ":")
).encode("utf-8", "surrogatepass")
def json_loads(self, data: bytes) -> Any:
return json.loads(data)
def loads(self, data: bytes) -> Any:
# Sometimes responses use Content-Type: json but actually
# don't contain any data. We should return something instead
# of erroring in these cases.
if data == b"":
return None
try:
return self.json_loads(data)
except (ValueError, TypeError) as e:
raise SerializationError(
message=f"Unable to deserialize as JSON: {data!r}", errors=(e,)
)
def dumps(self, data: Any) -> bytes:
# The body is already encoded to bytes
# so we forward the request body along.
if isinstance(data, str):
return data.encode("utf-8", "surrogatepass")
elif isinstance(data, bytes):
return data
try:
return self.json_dumps(data)
# This should be captured by the .default()
# call but just in case we also wrap these.
except (ValueError, UnicodeError, TypeError) as e: # pragma: nocover
raise SerializationError(
message=f"Unable to serialize to JSON: {data!r} (type: {type(data).__name__})",
errors=(e,),
)
if orjson is not None:
class OrjsonSerializer(JsonSerializer):
"""JSON serializer relying on the orjson package.
Only available if orjson if installed. It is faster, especially for vectors, but is also stricter.
"""
def json_dumps(self, data: Any) -> bytes:
return orjson.dumps(
data, default=self.default, option=orjson.OPT_SERIALIZE_NUMPY
)
def json_loads(self, data: bytes) -> Any:
return orjson.loads(data)
class NdjsonSerializer(JsonSerializer):
"""Newline delimited JSON (NDJSON) serializer relying on the standard library json module."""
mimetype: ClassVar[str] = "application/x-ndjson"
def loads(self, data: bytes) -> Any:
ndjson = []
for line in re.split(b"[\n\r]", data):
if not line:
continue
try:
ndjson.append(self.json_loads(line))
except (ValueError, TypeError) as e:
raise SerializationError(
message=f"Unable to deserialize as NDJSON: {data!r}", errors=(e,)
)
return ndjson
def dumps(self, data: Any) -> bytes:
# The body is already encoded to bytes
# so we forward the request body along.
if isinstance(data, (bytes, str)):
data = (data,)
buffer = bytearray()
for line in data:
if isinstance(line, str):
line = line.encode("utf-8", "surrogatepass")
if isinstance(line, bytes):
buffer += line
# Ensure that there is always a final newline
if not line.endswith(b"\n"):
buffer += b"\n"
else:
try:
buffer += self.json_dumps(line)
buffer += b"\n"
# This should be captured by the .default()
# call but just in case we also wrap these.
except (ValueError, UnicodeError, TypeError) as e: # pragma: nocover
raise SerializationError(
message=f"Unable to serialize to NDJSON: {data!r} (type: {type(data).__name__})",
errors=(e,),
)
return bytes(buffer)
DEFAULT_SERIALIZERS = {
JsonSerializer.mimetype: JsonSerializer(),
TextSerializer.mimetype: TextSerializer(),
NdjsonSerializer.mimetype: NdjsonSerializer(),
}
class SerializerCollection:
"""Collection of serializers that can be fetched by mimetype. Used by
:class:`elastic_transport.Transport` to serialize and deserialize native
Python types into bytes before passing to a node.
"""
def __init__(
self,
serializers: Optional[Mapping[str, Serializer]] = None,
default_mimetype: str = "application/json",
):
if serializers is None:
serializers = DEFAULT_SERIALIZERS
try:
self.default_serializer = serializers[default_mimetype]
except KeyError:
raise ValueError(
f"Must configure a serializer for the default mimetype {default_mimetype!r}"
) from None
self.serializers = dict(serializers)
def dumps(self, data: Any, mimetype: Optional[str] = None) -> bytes:
return self.get_serializer(mimetype).dumps(data)
def loads(self, data: bytes, mimetype: Optional[str] = None) -> Any:
return self.get_serializer(mimetype).loads(data)
def get_serializer(self, mimetype: Optional[str]) -> Serializer:
# split out charset
if mimetype is None:
serializer = self.default_serializer
else:
mimetype, _, _ = mimetype.partition(";")
try:
serializer = self.serializers[mimetype]
except KeyError:
# Try for '<mimetype-supertype>/*' types after the specific type fails.
try:
mimetype_supertype = mimetype.partition("/")[0]
serializer = self.serializers[f"{mimetype_supertype}/*"]
except KeyError:
raise SerializationError(
f"Unknown mimetype, not able to serialize or deserialize: {mimetype}"
) from None
return serializer