in azext_iot/sdk/deviceupdate/controlplane/_serialization.py [0:0]
def _serialize(self, target_obj, data_type=None, **kwargs):
"""Serialize data into a string according to type.
:param target_obj: The data to be serialized.
:param str data_type: The type to be serialized from.
:rtype: str, dict
:raises: SerializationError if serialization fails.
"""
key_transformer = kwargs.get("key_transformer", self.key_transformer)
keep_readonly = kwargs.get("keep_readonly", False)
if target_obj is None:
return None
attr_name = None
class_name = target_obj.__class__.__name__
if data_type:
return self.serialize_data(
target_obj, data_type, **kwargs)
if not hasattr(target_obj, "_attribute_map"):
data_type = type(target_obj).__name__
if data_type in self.basic_types.values():
return self.serialize_data(
target_obj, data_type, **kwargs)
# Force "is_xml" kwargs if we detect a XML model
try:
is_xml_model_serialization = kwargs["is_xml"]
except KeyError:
is_xml_model_serialization = kwargs.setdefault("is_xml", target_obj.is_xml_model())
serialized = {}
if is_xml_model_serialization:
serialized = target_obj._create_xml_node()
try:
attributes = target_obj._attribute_map
for attr, attr_desc in attributes.items():
attr_name = attr
if not keep_readonly and target_obj._validation.get(attr_name, {}).get('readonly', False):
continue
if attr_name == "additional_properties" and attr_desc["key"] == '':
if target_obj.additional_properties is not None:
serialized.update(target_obj.additional_properties)
continue
try:
orig_attr = getattr(target_obj, attr)
if is_xml_model_serialization:
pass # Don't provide "transformer" for XML for now. Keep "orig_attr"
else: # JSON
keys, orig_attr = key_transformer(attr, attr_desc.copy(), orig_attr)
keys = keys if isinstance(keys, list) else [keys]
kwargs["serialization_ctxt"] = attr_desc
new_attr = self.serialize_data(orig_attr, attr_desc['type'], **kwargs)
if is_xml_model_serialization:
xml_desc = attr_desc.get('xml', {})
xml_name = xml_desc.get('name', attr_desc['key'])
xml_prefix = xml_desc.get('prefix', None)
xml_ns = xml_desc.get('ns', None)
if xml_desc.get("attr", False):
if xml_ns:
ET.register_namespace(xml_prefix, xml_ns)
xml_name = "{}{}".format(xml_ns, xml_name)
serialized.set(xml_name, new_attr)
continue
if xml_desc.get("text", False):
serialized.text = new_attr
continue
if isinstance(new_attr, list):
serialized.extend(new_attr)
elif isinstance(new_attr, ET.Element):
# If the down XML has no XML/Name, we MUST replace the tag with the local tag. But keeping the namespaces.
if 'name' not in getattr(orig_attr, '_xml_map', {}):
splitted_tag = new_attr.tag.split("}")
if len(splitted_tag) == 2: # Namespace
new_attr.tag = "}".join([splitted_tag[0], xml_name])
else:
new_attr.tag = xml_name
serialized.append(new_attr)
else: # That's a basic type
# Integrate namespace if necessary
local_node = _create_xml_node(
xml_name,
xml_prefix,
xml_ns
)
local_node.text = unicode_str(new_attr)
serialized.append(local_node)
else: # JSON
for k in reversed(keys):
unflattened = {k: new_attr}
new_attr = unflattened
_new_attr = new_attr
_serialized = serialized
for k in keys:
if k not in _serialized:
_serialized.update(_new_attr)
_new_attr = _new_attr[k]
_serialized = _serialized[k]
except ValueError:
continue
except (AttributeError, KeyError, TypeError) as err:
msg = "Attribute {} in object {} cannot be serialized.\n{}".format(
attr_name, class_name, str(target_obj))
raise_with_traceback(SerializationError, msg, err)
else:
return serialized