pai/serializers.py (416 lines of code) (raw):
# Copyright 2023 Alibaba, Inc. or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import urllib.request
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.error import HTTPError
import backoff
import numpy
import numpy as np
import six
from eas_prediction import pytorch_predict_pb2 as pt_pb
from eas_prediction import tf_request_pb2 as tf_pb
from .common.logging import get_logger
from .session import Session, get_default_session
logger = get_logger(__name__)
def _is_pil_image(data) -> bool:
try:
from PIL import Image
return isinstance(data, Image.Image)
except ImportError:
return False
def _is_numpy_ndarray(data) -> bool:
try:
import numpy
return isinstance(data, numpy.ndarray)
except ImportError:
return False
def _is_pandas_dataframe(data) -> bool:
try:
import pandas
return isinstance(data, pandas.DataFrame)
except ImportError:
return False
class TensorFlowIOSpec(object):
def __init__(self, name: str, shape: Tuple, data_type: tf_pb.ArrayDataType):
"""A class represents TensorFlow inputs/outputs spec.
Args:
name (str): Name for the spec.
shape (Tuple): The shape of the input/output value.
data_type (tf_pb.ArrayDataType): Data type of the value.
"""
self.name = name
self.shape = shape
self.data_type = data_type
class SerializerBase(ABC):
"""Abstract class for creating a Serializer class for predictor."""
@abstractmethod
def serialize(self, data) -> bytes:
"""Serialize the input data to bytes for transmitting."""
@abstractmethod
def deserialize(self, data: bytes):
"""Deserialize the data from raw bytes to Python object ."""
def inspect_from_service(
self, service_name: str, *, session: Optional[Session] = None
):
"""Inspect the online prediction service to complete the serializer instance
initialization.
The implementation of the `inspect_from_service` method is optional. You only
need to implement it if your serializer requires additional information from
service metadata or if it needs to send a request to the service in order to
be initialized.
"""
class BytesSerializer(SerializerBase):
"""A Serializer object that serialize input data into bytes format and deserialize
bytes formatted data into python object."""
def serialize(self, data) -> bytes:
if isinstance(data, (dict, list, tuple)):
return json.dumps(data).encode()
elif isinstance(data, str):
return data.encode()
elif isinstance(data, bytes):
return data
else:
return str(data).encode()
def deserialize(self, data: bytes):
return data
class JsonSerializer(SerializerBase):
"""A Serializer object that serialize input data into JSON format and deserialize
JSON formatted data into python object."""
def serialize(self, data) -> bytes:
if isinstance(data, six.string_types):
return data
if _is_pandas_dataframe(data):
data = data.to_numpy().tolist()
elif _is_numpy_ndarray(data):
data = data.tolist()
return json.dumps(data).encode()
def deserialize(self, data):
return json.loads(data)
class TensorFlowSerializer(SerializerBase):
"""A Serializer class that responsible for transforming input/output data for
TensorFlow processor service."""
NUMPY_DATA_TYPE_MAPPING = {
"DT_FLOAT": np.float32,
"DT_DOUBLE": np.float64,
"DT_INT8": np.int8,
"DT_INT16": np.int16,
"DT_INT32": np.int32,
"DT_INT64": np.int64,
"DT_UINT8": np.uint8,
"DT_UINT16": np.uint16,
"DT_BOOL": np.bool_,
"DT_STRING": np.str_,
}
def __init__(
self,
):
"""TensorflowSerializer initializer."""
self._input_specs = []
self._output_filter = []
self._signature_name = None
super(TensorFlowSerializer, self).__init__()
def inspect_from_service(
self, service_name: str, *, session: Optional[Session] = None
):
"""Inspect the service to complete serializer instance initialization.
Args:
service_name (str): Name of the online prediction service.
session (:class:`pai.session.Session`): A PAI session instance used for
communicating with PAI services.
"""
session = session or get_default_session()
sig_def = self.inspect_model_signature_def(service_name, session=session)
self._init_from_signature_def(sig_def)
@classmethod
def inspect_model_signature_def(
cls, service_name: str, *, session: Session = None
) -> Dict[str, Any]:
"""Inspect the TensorFlow serving model signature by sending a request to
the service.
TensorFlow processor creates a prediction service and exposes an HTTP API for
model signature definition.
Example API returns::
{
"signature_name": "serving_default",
"inputs": [
{
"name": "flatten_input",
"shape": [
-1,
28,
28
],
"type": "DT_FLOAT"
}
],
"outputs": [
{
"name": "dense_1",
"shape": [
-1,
10
],
"type": "DT_FLOAT"
}
]
}
Returns:
A dictionary that represents the model signature definition.
"""
from pai.predictor import ServiceStatus
session = session or get_default_session()
service_api_object = session.service_api.get(service_name)
if service_api_object["Status"] != ServiceStatus.Running:
raise RuntimeError(
f"Service is not ready, cannot send request to the service to inspect "
f"model signature definition: "
f"name={service_api_object['ServiceName']} "
f"status={service_api_object['Status']} "
f"reason={service_api_object['Reason']} "
f"message={service_api_object['Message']}."
)
@backoff.on_exception(
backoff.expo,
exception=HTTPError,
max_tries=3,
max_time=10,
)
def _send_request():
request = urllib.request.Request(
url=service_api_object["InternetEndpoint"],
headers={
"Authorization": service_api_object["AccessToken"],
},
)
resp = urllib.request.urlopen(request)
return resp
resp = _send_request()
signature_def = json.load(resp)
return signature_def
def serialize(self, data: Union[Dict[str, Any], tf_pb.PredictRequest]) -> bytes:
if isinstance(data, tf_pb.PredictRequest):
return data.SerializeToString()
request = tf_pb.PredictRequest()
if self._output_filter:
for output_name in self._output_filter:
request.output_filter.append(output_name)
if not isinstance(data, dict):
if not self._input_specs or len(self._input_specs) > 1:
raise ValueError(
"TensorFlowSerializer accepts a dictionary as input data, "
"with each input value having a name."
)
else:
# TensorFlow Processor expects key-value pairs for input data. However,
# if the input data is not a dictionary and the deployed model accepts
# exactly one input (by model signature), the key will be inferred
# from model signature and the current input data will be taken as
# value.
value = numpy.asarray(data)
input_spec = self._input_specs[0]
if (
input_spec.shape
and len([dim for dim in input_spec.shape if dim == -1]) == 1
):
value = value.reshape(input_spec.shape)
data_type = (
input_spec.data_type
if input_spec and input_spec.data_type is not None
else self._np_dtype_to_tf_dtype(value.dtype.type)
)
self._put_value(
request=request,
name=input_spec.name,
data_type=data_type,
shape=value.shape,
data=np.ravel(value).tolist(),
)
else:
input_specs_dict = (
{input_spec.name: input_spec for input_spec in self._input_specs}
if self._input_specs
else {}
)
for name, value in data.items():
input_spec = input_specs_dict.get(name)
if not isinstance(value, np.ndarray):
value = np.asarray(value)
data_type = (
input_spec.data_type
if input_spec and input_spec.data_type is not None
else self._np_dtype_to_tf_dtype(value.dtype.type)
)
if (
input_spec
and input_spec.shape
and len([dim for dim in input_spec.shape if dim == -1]) == 1
):
value = value.reshape(input_spec.shape)
self._put_value(
request=request,
name=input_spec.name,
data_type=data_type,
shape=value.shape,
data=np.ravel(value).tolist(),
)
return request.SerializeToString()
def _init_from_signature_def(self, signature_def):
"""Build TensorFlowSerializer from signature def.
Args:
signature_def: Signature def returns from PAI-EAS tensorflow processor.
Returns:
TensorFlowSerializer:
"""
inputs = signature_def["inputs"]
signature_def_key = signature_def["signature_name"]
input_specs = []
output_specs = []
for input_def in inputs:
data_type = tf_pb.ArrayDataType.Value(input_def["type"])
input_spec = TensorFlowIOSpec(
name=input_def["name"],
data_type=data_type,
# use batch_size=1
shape=input_def["shape"][1:],
)
input_specs.append(input_spec)
for output_def in signature_def["outputs"]:
data_type = tf_pb.ArrayDataType.Value(output_def["type"])
output_spec = TensorFlowIOSpec(
name=output_def["name"],
data_type=data_type,
shape=output_def["shape"],
)
output_specs.append(output_spec)
if not self._signature_name:
self._signature_name = signature_def_key
if not self._input_specs:
self._input_specs = input_specs
if not self._output_filter:
self._output_filter = [spec.name for spec in output_specs]
def deserialize(self, data: bytes):
response = tf_pb.PredictResponse()
response.ParseFromString(data)
output_names = response.outputs.keys()
results = {}
for name in output_names:
results[name] = self._get_value(
response=response,
name=name,
)
return results
def _np_dtype_to_tf_dtype(self, np_dtype):
rev_map = {value: key for key, value in self.NUMPY_DATA_TYPE_MAPPING.items()}
if np_dtype not in rev_map:
raise ValueError(
f"Numpy dtype {np_dtype} is not supported in TensorFlowSerializer."
)
return tf_pb.ArrayDataType.Value(rev_map[np_dtype])
def _tf_dtype_to_np_dtype(self, data_type):
data_type_name = tf_pb.ArrayDataType.Name(data_type)
if data_type_name not in self.NUMPY_DATA_TYPE_MAPPING:
raise ValueError(
f"Data type {data_type_name} is not supported in TensorFlowSerializer."
)
return self.NUMPY_DATA_TYPE_MAPPING.get(data_type_name)
def _put_value(
self, request: tf_pb.PredictRequest, name: str, data_type, shape, data
):
request.inputs[name].dtype = data_type
request.inputs[name].array_shape.dim.extend(shape)
integer_types = [
tf_pb.DT_INT8,
tf_pb.DT_INT16,
tf_pb.DT_INT32,
tf_pb.DT_UINT8,
tf_pb.DT_UINT16,
tf_pb.DT_QINT8,
tf_pb.DT_QINT16,
tf_pb.DT_QINT32,
tf_pb.DT_QUINT8,
tf_pb.DT_QUINT16,
]
if data_type == tf_pb.DT_FLOAT:
request.inputs[name].float_val.extend(data)
elif data_type == tf_pb.DT_DOUBLE:
request.inputs[name].double_val.extend(data)
elif data_type in integer_types:
request.inputs[name].int_val.extend(data)
elif data_type == tf_pb.DT_INT64:
request.inputs[name].int64_val.extend(data)
elif data_type == tf_pb.DT_BOOL:
request.inputs[name].bool_val.extend(data)
elif data_type == tf_pb.DT_STRING:
request.inputs[name].string_val.extend(data)
else:
raise ValueError(
f"Not supported input data type for TensorFlow PredictRequest: {data_type}"
)
def _get_value(self, response: tf_pb.PredictResponse, name):
output = response.outputs[name]
if (
name not in response.outputs
or tf_pb.DT_INVALID == response.outputs[name].dtype
):
return
np_dtype = self._tf_dtype_to_np_dtype(response.outputs[name].dtype)
shape = list(output.array_shape.dim)
if output.dtype == tf_pb.DT_FLOAT:
return np.asarray(output.float_val, np_dtype).reshape(shape)
elif output.dtype in (tf_pb.DT_INT8, tf_pb.DT_INT16, tf_pb.DT_INT32):
return np.asarray(output.int_val, np_dtype).reshape(shape)
elif output.dtype == tf_pb.DT_INT64:
return np.asarray(output.int64_val, np_dtype).reshape(shape)
elif output.dtype == tf_pb.DT_DOUBLE:
return np.asarray(output.double_val, np_dtype).reshape(shape)
elif output.dtype == tf_pb.DT_STRING:
return np.asarray(output.string_val, np_dtype).reshape(shape)
elif output.dtype == tf_pb.DT_BOOL:
return np.asarray(output.bool_val, np_dtype).reshape(shape)
else:
raise ValueError(f"Not support data_type: {output.dtype}")
class PyTorchSerializer(SerializerBase):
"""A serializer responsible for transforming input/output data for PyTorch
processor service.
"""
NUMPY_DATA_TYPE_MAPPING = {
"DT_FLOAT": np.float32,
"DT_DOUBLE": np.float64,
"DT_INT8": np.int8,
"DT_INT16": np.int16,
"DT_INT32": np.int32,
"DT_INT64": np.int64,
"DT_UINT8": np.uint8,
"DT_UINT16": np.uint16,
"DT_BOOL": np.bool_,
"DT_STRING": np.str_,
}
def __init__(
self,
):
self._output_filter = []
def _np_dtype_to_torch_dtype(self, np_dtype):
"""Get PredictRequest data_type from dtype of input np.ndarray."""
rev_map = {value: key for key, value in self.NUMPY_DATA_TYPE_MAPPING.items()}
if np_dtype not in rev_map:
raise ValueError(
f"Numpy dtype {np_dtype} is not supported in PyTorchSerializer."
)
return pt_pb.ArrayDataType.Value(rev_map[np_dtype])
def _torch_dtype_to_numpy_dtype(self, data_type):
data_type_name = pt_pb.ArrayDataType.Name(data_type)
if data_type_name not in self.NUMPY_DATA_TYPE_MAPPING:
raise ValueError(
f"Data type {data_type_name} is not supported in PyTorchSerializer."
)
return self.NUMPY_DATA_TYPE_MAPPING.get(data_type_name)
def serialize(self, data: Union[np.ndarray, List, Tuple]) -> bytes:
request = pt_pb.PredictRequest()
if _is_pil_image(data):
data = np.asarray(data)
elif isinstance(data, (bytes, str)):
data = np.asarray(data)
if isinstance(data, np.ndarray):
# if input data type is np.ndarray, we assume there is only one input data
# for the prediction request.
self._put_value(
request,
index=0,
shape=data.shape,
data_type=self._np_dtype_to_torch_dtype(data.dtype.type),
data=np.ravel(data).tolist(),
)
elif isinstance(data, (List, Tuple)):
# if input data type is List or Tuple, we assume there is multi input data.
# for the prediction request.
for idx, item in enumerate(data):
if not isinstance(item, np.ndarray):
item = np.asarray(item)
if not item:
continue
self._put_value(
request,
index=0,
shape=item.shape,
data_type=self._np_dtype_to_torch_dtype(item.dtype.type),
data=np.ravel(item).tolist(),
)
else:
raise ValueError(
"PyTorchSerializer accept List, Tuple as input request data."
)
return request.SerializeToString()
def deserialize(self, data: bytes):
resp = pt_pb.PredictResponse()
resp.ParseFromString(data)
if len(resp.outputs) > 1:
results = []
for idx in range(resp.outputs):
results.append(self._get_value(resp, idx))
return results
elif len(resp.outputs) == 1:
return self._get_value(resp, index=0)
def _put_value(
self, request: pt_pb.PredictRequest, index: int, shape, data_type, data
):
while len(request.inputs) < index + 1:
request.inputs.add()
request.inputs[index].dtype = data_type
request.inputs[index].array_shape.dim.extend(shape)
if data_type == pt_pb.DT_FLOAT:
request.inputs[index].float_val.extend(data)
elif data_type == pt_pb.DT_DOUBLE:
request.inputs[index].double_val.extend(data)
elif data_type in (
pt_pb.DT_INT8,
pt_pb.DT_INT16,
pt_pb.DT_INT32,
pt_pb.DT_UINT8,
):
request.inputs[index].int_val.extend(data)
elif data_type == pt_pb.DT_INT64:
request.inputs[index].int64_val.extend(data)
else:
raise ValueError(f"Not supported PyTorch request data type: {data_type}")
def _get_value(self, response: pt_pb.PredictResponse, index: int):
output = response.outputs[index]
if output.dtype == pt_pb.DT_INVALID:
return
np_dtype = self._torch_dtype_to_numpy_dtype(output.dtype)
shape = list(output.array_shape.dim)
if output.dtype == pt_pb.DT_FLOAT:
return np.asarray(output.float_val, np_dtype).reshape(shape)
elif output.dtype in (
pt_pb.DT_INT8,
pt_pb.DT_INT16,
pt_pb.DT_INT32,
pt_pb.DT_UINT8,
):
return np.asarray(output.int_val, np_dtype).reshape(shape)
elif output.dtype == pt_pb.DT_INT64:
return np.asarray(output.int64_val, np_dtype).reshape(shape)
elif output.dtype == pt_pb.DT_DOUBLE:
return np.asarray(output.double_val, np_dtype).reshape(shape)
else:
raise ValueError(
f"Not supported PyTorch response data type: {output.dtype}"
)