def _transform_container()

in metaflow/plugins/env_escape/data_transferer.py [0:0]


    def _transform_container(self, checker, processor, recursor, obj, in_place=True):
        def _sub_process(obj):
            obj_type = type(obj)
            if obj is None or obj_type in _simple_types or obj_type == str:
                return None
            elif obj_type in _container_types:
                return recursor(obj)
            elif checker(obj):
                return processor(obj)
            else:
                raise RuntimeError(
                    "Cannot pickle object of type %s: %s" % (obj_type, str(obj))
                )

        cast_to = None
        key_change_allowed = True
        update_default_factory = False
        has_changes = False
        if isinstance(obj, (tuple, set, frozenset)):
            cast_to = type(obj)
            obj = list(obj)
            in_place = True  # We can do in place since we copied the object
        if isinstance(obj, OrderedDict):
            key_change_allowed = False
        if isinstance(obj, defaultdict):
            # In this case, we use a hack to store the default_factory
            # function inside the dictionary which will get it pickled by
            # the server (as a reference). This is because if this is a lambda
            # it won't pickle well.
            if callable(obj.default_factory):
                if not in_place:
                    obj = copy(obj)
                    in_place = True
                obj["__default_factory"] = obj.default_factory
                obj.default_factory = None
            elif obj.get("__default_factory") is not None:
                # This is in the unpickle path, we need to reset the factory properly
                update_default_factory = True
            has_changes = True
        # We now deal with list or dict
        if isinstance(obj, list):
            for idx in range(len(obj)):
                sub_obj = _sub_process(obj[idx])
                if sub_obj is not None:
                    has_changes = True
                    if not in_place:
                        obj = list(obj)
                        in_place = True
                    obj[idx] = sub_obj
        elif isinstance(obj, dict):
            new_items = {}
            del_keys = []
            for k, v in obj.items():
                sub_key = _sub_process(k)
                if sub_key is not None:
                    if not key_change_allowed:
                        raise RuntimeError(
                            "OrderedDict key cannot contain references -- "
                            "this would change the order"
                        )
                    has_changes = True
                sub_val = _sub_process(v)
                if sub_val is not None:
                    has_changes = True
                if has_changes and not in_place:
                    obj = copy(obj)
                    in_place = True
                if sub_key:
                    if sub_val:
                        new_items[sub_key] = sub_val
                    else:
                        new_items[sub_key] = v
                    del_keys.append(k)
                else:
                    if sub_val:
                        obj[k] = sub_val
            for k in del_keys:
                del obj[k]
            obj.update(new_items)
        else:
            raise RuntimeError("Unknown container type: %s" % type(obj))
        if update_default_factory:
            # We do this here because we now unpickled the reference
            # to default_dict and can set it back up again.
            obj.default_factory = obj["__default_factory"]
            del obj["__default_factory"]
        if has_changes:
            if cast_to:
                return cast_to(obj)
            return obj
        return None