in metaflow/plugins/env_escape/data_transferer.py [0:0]
def _transform_container(self, checker, processor, recursor, obj, in_place=True):
def _sub_process(obj):
obj_type = type(obj)
if obj is None or obj_type in _simple_types or obj_type == str:
return None
elif obj_type in _container_types:
return recursor(obj)
elif checker(obj):
return processor(obj)
else:
raise RuntimeError(
"Cannot pickle object of type %s: %s" % (obj_type, str(obj))
)
cast_to = None
key_change_allowed = True
update_default_factory = False
has_changes = False
if isinstance(obj, (tuple, set, frozenset)):
cast_to = type(obj)
obj = list(obj)
in_place = True # We can do in place since we copied the object
if isinstance(obj, OrderedDict):
key_change_allowed = False
if isinstance(obj, defaultdict):
# In this case, we use a hack to store the default_factory
# function inside the dictionary which will get it pickled by
# the server (as a reference). This is because if this is a lambda
# it won't pickle well.
if callable(obj.default_factory):
if not in_place:
obj = copy(obj)
in_place = True
obj["__default_factory"] = obj.default_factory
obj.default_factory = None
elif obj.get("__default_factory") is not None:
# This is in the unpickle path, we need to reset the factory properly
update_default_factory = True
has_changes = True
# We now deal with list or dict
if isinstance(obj, list):
for idx in range(len(obj)):
sub_obj = _sub_process(obj[idx])
if sub_obj is not None:
has_changes = True
if not in_place:
obj = list(obj)
in_place = True
obj[idx] = sub_obj
elif isinstance(obj, dict):
new_items = {}
del_keys = []
for k, v in obj.items():
sub_key = _sub_process(k)
if sub_key is not None:
if not key_change_allowed:
raise RuntimeError(
"OrderedDict key cannot contain references -- "
"this would change the order"
)
has_changes = True
sub_val = _sub_process(v)
if sub_val is not None:
has_changes = True
if has_changes and not in_place:
obj = copy(obj)
in_place = True
if sub_key:
if sub_val:
new_items[sub_key] = sub_val
else:
new_items[sub_key] = v
del_keys.append(k)
else:
if sub_val:
obj[k] = sub_val
for k in del_keys:
del obj[k]
obj.update(new_items)
else:
raise RuntimeError("Unknown container type: %s" % type(obj))
if update_default_factory:
# We do this here because we now unpickled the reference
# to default_dict and can set it back up again.
obj.default_factory = obj["__default_factory"]
del obj["__default_factory"]
if has_changes:
if cast_to:
return cast_to(obj)
return obj
return None