odps/serializers.py (700 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import email.header
import inspect
import json
import re
from collections import OrderedDict
from xml.dom import minidom
import requests
from . import utils
from .compat import BytesIO, ElementTree, long_type, six
def _route_xml_path(root, *keys, **kw):
create_if_not_exists = kw.get("create_if_not_exists", False)
if isinstance(root, six.string_types):
root = ElementTree.fromstring(root)
for key in keys:
if key == ".":
return root
prev = root
root = root.find(key)
if root is None:
if not create_if_not_exists:
return
root = ElementTree.Element(key)
prev.append(root)
return root
def _extract_encoded_json(content):
if not content.startswith("=?"):
return json.loads(content)
content, encoding = email.header.decode_header(content)[0]
return json.loads(content.decode(encoding))
def _route_json_path(root, *keys, **kw):
create_if_not_exists = kw.get("create_if_not_exists", False)
if isinstance(root, six.string_types):
root = _extract_encoded_json(root)
for key in keys:
prev = root
root = root.get(key)
if root is None:
if not create_if_not_exists:
return
root = prev[key] = OrderedDict()
return root
def parse_ndarray(array):
try:
import numpy as np
return np.asarray(array)
except ImportError:
return array
def serialize_ndarray(array):
try:
return array.tolist()
except AttributeError:
return array
def none_or(func):
def new_func(x):
if x is None:
return x
return func(x)
return new_func
_serialize_types = dict()
_serialize_types["bool"] = (none_or(utils.str_to_bool), none_or(utils.bool_to_str))
_serialize_types["json"] = (none_or(json.loads), none_or(json.dumps))
_serialize_types["rfc822"] = (none_or(utils.parse_rfc822), none_or(utils.gen_rfc822))
_serialize_types["rfc822l"] = (
none_or(utils.parse_rfc822),
none_or(lambda s: utils.gen_rfc822(s, localtime=True)),
)
_serialize_types["ndarray"] = (none_or(parse_ndarray), none_or(serialize_ndarray))
_serialize_types["timestamp_ms"] = (
none_or(lambda x: datetime.datetime.fromtimestamp(long_type(x) / 1000.0)),
none_or(lambda x: long_type(x.timestamp() * 1000)),
)
_serialize_types["int"] = (none_or(long_type), none_or(str))
_serialize_types["float"] = (none_or(float), none_or(str))
class SerializeField(object):
def __init__(self, *keys, **kwargs):
self._path_keys = keys
self._required = kwargs.get("required", False) # used when serialized
self._blank_if_null = kwargs.get(
"blank_if_null", True if self._required else False
)
self._default = kwargs.get("default")
if "type" in kwargs:
self._parse_callback, self._serialize_callback = _serialize_types[
kwargs.pop("type")
]
else:
self._parse_callback = kwargs.get("parse_callback")
self._serialize_callback = kwargs.get("serialize_callback")
self.set_to_parent = kwargs.get("set_to_parent", False)
def _to_str(self, val):
if isinstance(val, six.string_types):
return utils.to_str(val)
return val
def _set_default_keys(self, *keys):
if not self._path_keys:
self._path_keys = keys
def parse(self, root, **kwargs):
raise NotImplementedError
def serialize(self, root, value):
raise NotImplementedError
class HasSubModelField(SerializeField):
def __init__(self, model, *args, **kwargs):
if isinstance(model, six.string_types):
self._model_cls = None
self._model_str = model
else:
self._model_cls = model
self._model_str = None
super(HasSubModelField, self).__init__(*args, **kwargs)
@property
def _model(self):
if self._model_cls is not None:
return self._model_cls
models = self._model_str.split(".")
model_name = models[0]
module = None
for stack in inspect.stack():
globs = stack[0].f_globals
if model_name in globs:
possible_module = globs[model_name]
if inspect.isclass(possible_module) and issubclass(
possible_module, SerializableModel
):
module = possible_module
break
if module is None:
raise ValueError("Unknown model name: %s" % self._model_str)
res = None
for model in models[1:]:
if res is None:
res = getattr(module, model)
else:
res = getattr(res, model)
self._model_cls = res
return res
_default_name_maker = dict(
capitalized=utils.underline_to_capitalized,
raw=lambda v: v,
camel=utils.underline_to_camel,
)
class SerializableModelMetaClass(type):
def __new__(mcs, name, bases, kv):
slots = []
fields = dict()
for base in bases:
base_slots = list(getattr(base, "__slots__", []))
if "__weakref__" in base_slots:
base_slots.remove("__weakref__")
slots.extend(base_slots)
fields.update(getattr(base, "__fields", dict()))
slots.extend(kv.get("__slots__", []))
fields.update(kv.get("__fields", dict()))
attrs = []
parent_attrs = []
def_name = kv.pop("_" + name + "__default_name", "capitalized")
for attr, field in (
pair for pair in six.iteritems(kv) if not pair[0].startswith("__")
):
if inspect.isclass(field) and issubclass(field, SerializeField):
field = field()
if isinstance(field, SerializeField):
field._set_default_keys(_default_name_maker[def_name](attr))
if not field.set_to_parent:
slots.append(attr)
attrs.append(attr)
if field.set_to_parent:
parent_attrs.append(attr)
fields[attr] = field
kv["_parent_attrs"] = set(parent_attrs)
slots = tuple(OrderedDict.fromkeys(slots))
slots_pos = dict([(v, k) for k, v in enumerate(slots)])
fields = OrderedDict(
sorted(
six.iteritems(fields), key=lambda s: slots_pos.get(s[0], float("inf"))
)
)
for attr in attrs:
if attr in kv:
del kv[attr]
slots = tuple(slot for slot in slots if slot not in kv)
if len(slots) > 0:
kv["__slots__"] = slots
if len(fields) > 0:
kv["__fields"] = fields
return type.__new__(mcs, name, bases, kv)
class SerializableModel(six.with_metaclass(SerializableModelMetaClass)):
__slots__ = "_parent", "__weakref__"
def __init__(self, **kwargs):
slots = getattr(self, "__slots__", [])
for k, v in six.iteritems(kwargs):
if k in slots:
setattr(self, k, v)
for attr in slots:
try:
super(SerializableModel, self).__getattribute__(attr)
except AttributeError:
setattr(self, attr, None)
@property
def parent(self):
return self._parent
@classmethod
def _is_null(cls, v):
if v is None:
return True
if isinstance(v, (list, dict)) and len(v) == 0:
return True
return False
@classmethod
def _setattr(cls, obj, k, v, skip_null=True):
if cls._is_null(v) and object.__getattribute__(obj, k) is not None:
if not skip_null:
setattr(obj, k, v)
return
fields = getattr(type(obj), "__fields")
if not isinstance(fields[k], HasSubModelField):
setattr(obj, k, v)
elif isinstance(v, list):
setattr(obj, k, v)
else:
sub_obj = object.__getattribute__(obj, k)
new_obj = v
if sub_obj is None:
setattr(obj, k, v)
return
sub_fields = getattr(new_obj, "__fields", {})
for k in six.iterkeys(sub_fields):
if sub_fields[k].set_to_parent is True:
continue
cls._setattr(
sub_obj, k, object.__getattribute__(new_obj, k), skip_null=skip_null
)
@classmethod
def _init_obj(cls, content, obj=None, **kw):
fields = dict(getattr(cls, "__fields"))
_type = getattr(cls, "_type_indicator", None)
_name = "name" if "name" in fields else None
if obj is None and (_type is not None or "name" in fields):
kwargs = dict(kw)
for field in (_name, _type):
if field is None:
continue
typo = fields[field].parse(content, **kw)
kwargs[field] = typo
obj = cls(**kwargs)
return obj or cls(**kw)
@classmethod
def deserial(cls, content, obj=None, **kw):
obj = cls._init_obj(content, obj=obj, **kw)
obj_type = type(obj)
fields = dict(getattr(obj_type, "__fields"))
if isinstance(content, six.string_types):
if issubclass(obj_type, XMLSerializableModel):
content = ElementTree.fromstring(content)
else:
content = _extract_encoded_json(content)
parent_kw = dict()
self_kw = dict()
for attr, prop in six.iteritems(fields):
if isinstance(prop, SerializeField):
kwargs = dict(kw)
if isinstance(prop, HasSubModelField):
kwargs["_parent"] = obj
if not prop.set_to_parent:
self_kw[attr] = prop.parse(content, **kwargs)
else:
parent_kw[attr] = prop.parse(content, **kwargs)
for k, v in six.iteritems(self_kw):
obj_type._setattr(obj, k, v, skip_null=getattr(obj_type, "skip_null", True))
if obj.parent is not None:
for k, v in six.iteritems(parent_kw):
# remember that do not use `hasattr` here
try:
old_v = object.__getattribute__(obj.parent, k)
except AttributeError:
continue
if v is not None and old_v != v:
setattr(obj.parent, k, v)
return obj
def serial(self):
if isinstance(self, XMLSerializableModel):
assert self._root is not None
root = ElementTree.Element(self._root)
else:
root = OrderedDict()
for attr, prop in six.iteritems(getattr(self, "__fields")):
if isinstance(prop, SerializeField):
try:
prop.serialize(root, object.__getattribute__(self, attr))
except NotImplementedError:
continue
return root
def extract(self, **base_kw):
kwargs = base_kw.copy()
for attr in self.__slots__:
try:
kwargs[attr] = object.__getattribute__(self, attr)
except AttributeError:
pass
return kwargs
class XMLSerializableModel(SerializableModel):
__slots__ = ("_root",)
@classmethod
def parse(cls, response, obj=None, **kw):
if "parent" in kw:
kw["_parent"] = kw.pop("parent")
if isinstance(response, requests.Response):
# PY2 prefer bytes, while PY3 prefer str
response = response.content.decode() if six.PY3 else response.content
return cls.deserial(response, obj=obj, **kw)
def serialize(self):
root = self.serial()
sio = BytesIO()
ElementTree.ElementTree(root).write(sio, encoding="utf-8", xml_declaration=True)
xml_content = sio.getvalue()
prettified_xml = minidom.parseString(xml_content).toprettyxml(
indent=" " * 2, encoding="utf-8"
)
prettified_xml = utils.to_text(prettified_xml, encoding="utf-8")
cdata_re = re.compile(r"<!\[CDATA\[.*\]\]>", (re.M | re.S))
for src_cdata in cdata_re.finditer(prettified_xml):
src_cdata = src_cdata.group(0)
dest_cdata = (
src_cdata.replace("&", "&")
.replace("<", "<")
.replace(""", '"')
.replace(">", ">")
)
prettified_xml = prettified_xml.replace(src_cdata, dest_cdata)
return prettified_xml.replace(""", '"')
class JSONSerializableModel(SerializableModel):
@classmethod
def parse(cls, response, obj=None, **kw):
if "parent" in kw:
kw["_parent"] = kw.pop("parent")
if isinstance(response, requests.Response):
# PY2 prefer bytes, while PY3 prefer str
response = response.content.decode() if six.PY3 else response.content
return cls.deserial(response, obj=obj, **kw)
def serialize(self, **kwargs):
root = self.serial()
return json.dumps(root, **kwargs)
class XMLTagField(SerializeField):
def parse(self, root, **kwargs):
node = _route_xml_path(root, *self._path_keys)
val = self._default
if node is not None:
val = node.tag
if val is None:
return
if self._parse_callback:
return self._parse_callback(val)
return val
def _set_default_keys(self, *keys):
super(XMLTagField, self)._set_default_keys(".")
class XMLNodeField(SerializeField):
def parse(self, root, **kwargs):
node = _route_xml_path(root, *self._path_keys)
val = self._default
if node is not None:
val = node.text or self._default
if val is None:
return
if self._parse_callback:
return self._parse_callback(val)
return val
def serialize(self, root, value):
value = value if value is not None else self._default
if value is None and self._blank_if_null:
value = ""
if not self._required and value is None:
return
node = _route_xml_path(root, create_if_not_exists=True, *self._path_keys)
if self._serialize_callback:
node.text = utils.to_text(self._serialize_callback(value))
else:
node.text = utils.to_text(value)
class XMLNodeAttributeField(SerializeField):
def __init__(self, *keys, **kwargs):
self._attr = kwargs.pop("attr", None)
super(XMLNodeAttributeField, self).__init__(*keys, **kwargs)
def parse(self, root, **kwargs):
assert self._attr is not None
node = _route_xml_path(root, *self._path_keys)
val = self._default
if node is not None:
val = node.get(self._attr)
if val is None:
return
if self._parse_callback:
return self._parse_callback(val)
return node.get(self._attr)
def serialize(self, root, value):
assert self._attr is not None
value = value if value is not None else self._default
if value is None:
if self._default is not None:
value = self._default
elif self._blank_if_null:
value = ""
if not self._required and value is None:
return
node = _route_xml_path(root, create_if_not_exists=True, *self._path_keys)
if self._serialize_callback:
node.set(self._attr, self._serialize_callback(value))
else:
node.set(self._attr, value)
def _set_default_keys(self, *keys):
if self._attr is None:
self._attr = keys[0]
class XMLNodesField(SerializeField):
def parse(self, root, **kwargs):
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
root = _route_xml_path(root, *prev_path_keys)
values = self._default
if root is not None:
values = [node.text for node in root.findall(self._path_keys[-1])]
if values is None:
return
if self._parse_callback:
return self._parse_callback(values)
return values
def serialize(self, root, value):
value = value if value is not None else self._default
if value is None and self._blank_if_null:
value = []
if not self._required and value is None:
return
if self._serialize_callback:
value = self._serialize_callback(value)
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
root = _route_xml_path(root, create_if_not_exists=True, *prev_path_keys)
for val in value:
element = ElementTree.Element(self._path_keys[-1])
element.text = utils.to_text(val)
root.append(element)
class XMLNodeReferenceField(HasSubModelField):
def parse(self, root, **kwargs):
node = _route_xml_path(root, *self._path_keys)
instance = self._default
if node is not None:
if issubclass(self._model, JSONSerializableModel):
node = node.text
instance = self._model.deserial(node, **kwargs)
if isinstance(instance, XMLSerializableModel) and instance._root is None:
instance._root = node.tag
if instance is None:
return
if self._parse_callback:
return self._parse_callback(instance)
return instance
def serialize(self, root, value):
value = value if value is not None else self._default
if not self._required and value is None:
return
if self._serialize_callback:
value = self._serialize_callback(value)
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
root = _route_xml_path(root, create_if_not_exists=True, *prev_path_keys)
if isinstance(value, XMLSerializableModel) and getattr(value, "_root") is None:
setattr(value, "_root", self._path_keys[-1])
val = value.serial()
if isinstance(value, JSONSerializableModel): # JSON mixed in XML
element = ElementTree.Element(self._path_keys[-1])
val = json.dumps(val)
element.text = val
root.append(element)
else:
root.append(val)
class XMLNodesReferencesField(HasSubModelField):
def parse(self, root, **kwargs):
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
root = _route_xml_path(root, *prev_path_keys)
instances = self._default
if root is not None:
instances = []
tag = self._path_keys[-1]
if tag == "*":
nodes = list(root)
else:
nodes = root.findall(self._path_keys[-1])
for node in nodes:
instance = self._model.deserial(node, **kwargs)
if (
isinstance(instance, XMLSerializableModel)
and instance._root is None
):
instance._root = node.tag
instances.append(instance)
if instances is None:
return
if self._parse_callback:
return self._parse_callback(instances)
return instances
def serialize(self, root, value):
value = value if value is not None else self._default
if value is None and self._blank_if_null:
value = []
if not self._required and value is None:
return
if self._serialize_callback:
value = self._serialize_callback(value)
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
root = _route_xml_path(root, create_if_not_exists=True, *prev_path_keys)
for it in value:
if isinstance(it, XMLSerializableModel) and getattr(it, "_root") is None:
setattr(it, "_root", self._path_keys[-1])
val = it.serial()
if isinstance(it, JSONSerializableModel):
element = ElementTree.Element(self._path_keys[-1])
val = json.dumps(val)
element.text = val
root.append(element)
else:
root.append(val)
class XMLNodePropertiesField(SerializeField):
def __init__(self, *keys, **kwargs):
super(XMLNodePropertiesField, self).__init__(*keys, **kwargs)
self._key_tag = kwargs.get("key_tag")
self._key_attr = kwargs.get("key_attr")
if not self._key_tag and not self._key_attr:
raise TypeError("Need to specify one of key_tag or key_attr")
self._value_tag = kwargs.get("value_tag")
if not self._key_attr and not self._value_tag:
raise TypeError("Need to specify key_attr when value_tag is absent")
def parse(self, root, **kwargs):
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
root = _route_xml_path(root, *prev_path_keys)
results = self._default
if root is not None:
results = OrderedDict()
for node in root.findall(self._path_keys[-1]):
if self._key_attr is not None:
key = node.attrib.get(self._key_attr)
else:
key_node = node.find(self._key_tag)
key = key_node.text if key_node is not None else None
if self._value_tag is not None:
value_node = node.find(self._value_tag)
value = value_node.text if value_node is not None else None
else:
value = node.text
if key is not None and value is not None:
results[key] = value
if results is None:
return
if self._parse_callback:
return self._parse_callback(results)
return results
def serialize(self, root, value):
value = value if value is not None else self._default
if value is None and self._blank_if_null:
value = OrderedDict()
if not self._required and value is None:
return
if self._serialize_callback:
value = self._serialize_callback(value)
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
root = _route_xml_path(root, create_if_not_exists=True, *prev_path_keys)
for k, v in six.iteritems(value):
element = ElementTree.Element(self._path_keys[-1])
if self._key_attr is not None:
element.set(self._key_attr, utils.to_text(k))
else:
key_node = ElementTree.Element(self._key_tag)
key_node.text = utils.to_text(k)
element.append(key_node)
if self._value_tag is not None:
value_node = ElementTree.Element(self._value_tag)
value_node.text = utils.to_text(v)
element.append(value_node)
else:
element.text = utils.to_text(v)
root.append(element)
class JSONNodeField(SerializeField):
def parse(self, root, **kwargs):
val = self._default
if root is not None:
val = _route_json_path(root, *self._path_keys)
if val is None:
return
val = self._to_str(val)
if self._parse_callback:
return self._parse_callback(val)
return val
def serialize(self, root, value):
value = value if value is not None else self._default
if value is None and self._blank_if_null:
value = ""
if not self._required and value is None:
return
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
node = _route_json_path(root, create_if_not_exists=True, *prev_path_keys)
else:
node = root
if self._serialize_callback:
node[self._path_keys[-1]] = self._serialize_callback(value)
else:
node[self._path_keys[-1]] = value
class JSONNodesField(SerializeField):
def parse(self, root, **kwargs):
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
root = _route_json_path(root, *prev_path_keys)
values = self._default
if root is not None:
values = [self._to_str(node[self._path_keys[-1]]) for node in root]
if values is None:
return
if self._parse_callback:
return self._parse_callback(values)
return values
def serialize(self, root, value):
value = value if value is not None else self._default
if value is None and self._blank_if_null:
value = []
if not self._required and value is None:
return
if self._serialize_callback:
value = self._serialize_callback(value)
assert len(self._path_keys) >= 2
prev_path_keys = self._path_keys[:-2]
if prev_path_keys:
root = _route_json_path(root, create_if_not_exists=True, *prev_path_keys)
node = self._path_keys[-2]
if node not in root:
root[node] = []
root = root[node]
for val in value:
root.append({self._path_keys[-1]: val})
class JSONNodeReferenceField(HasSubModelField):
def __init__(self, model, *keys, **kwargs):
super(JSONNodeReferenceField, self).__init__(model, *keys, **kwargs)
self._check_before = kwargs.get("check_before")
def parse(self, root, **kwargs):
instance = self._default
if root is not None:
node = _route_json_path(root, *self._path_keys)
if self._check_before is not None:
_check = _route_json_path(root, *self._check_before)
if not _check:
return
if node is not None:
instance = self._model.deserial(node, **kwargs)
if instance is None:
return
if self._parse_callback:
return self._parse_callback(instance)
return instance
def serialize(self, root, value):
value = value if value is not None else self._default
if not self._required and value is None:
return
if self._serialize_callback:
value = self._serialize_callback(value)
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
root = _route_json_path(root, create_if_not_exists=True, *prev_path_keys)
root[self._path_keys[-1]] = value.serial()
class JSONNodesReferencesField(HasSubModelField):
def parse(self, root, **kwargs):
instances = self._default
if isinstance(root, list):
instances = [self._model.deserial(node, **kwargs) for node in root]
elif root is not None:
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
root = _route_json_path(root, *prev_path_keys)
if root is not None:
root = root.get(self._path_keys[-1])
if root is not None:
instances = [self._model.deserial(node, **kwargs) for node in root]
if instances is None:
return
if self._parse_callback:
return self._parse_callback(instances)
return instances
def serialize(self, root, value):
value = value if value is not None else self._default
if value is None and self._blank_if_null:
value = []
if not self._required and value is None:
return
if self._serialize_callback:
value = self._serialize_callback(value)
prev_path_keys = self._path_keys[:-1]
if prev_path_keys:
root = _route_json_path(root, create_if_not_exists=True, *prev_path_keys)
key = self._path_keys[-1]
if key not in root:
root[key] = []
[root[key].append(it.serial()) for it in value]
class JSONRawField(JSONNodeField):
def _set_default_keys(self, *keys):
return