def __init__()

in metaflow/plugins/env_escape/server.py [0:0]


    def __init__(self, config_dir, max_pickle_version):
        self._max_pickle_version = data_transferer.defaultProtocol = max_pickle_version
        try:
            mappings = importlib.import_module(".server_mappings", package=config_dir)
        except Exception as e:
            raise RuntimeError(
                "Cannot import server_mappings from '%s': %s" % (sys.path[0], str(e))
            )
        try:
            # Import module as a relative package to make sure that it is consistent
            # with how the client does it -- this enables us to do the same type of
            # relative imports in overrides
            override_module = importlib.import_module(".overrides", package=config_dir)
            override_values = override_module.__dict__.values()
        except ImportError:
            # We ignore so the file can be non-existent if not needed
            override_values = []
        except Exception as e:
            raise RuntimeError(
                "Cannot import overrides from '%s': %s" % (sys.path[0], str(e))
            )

        self._aliases = {}
        self._known_classes, a1 = self._flatten_dict(mappings.EXPORTED_CLASSES)
        self._class_types_to_names = {v: k for k, v in self._known_classes.items()}
        self._known_funcs, a2 = self._flatten_dict(mappings.EXPORTED_FUNCTIONS)
        self._known_vals, a3 = self._flatten_dict(mappings.EXPORTED_VALUES)
        self._known_exceptions, a4 = self._flatten_dict(mappings.EXPORTED_EXCEPTIONS)
        self._proxied_types = {
            "%s.%s" % (t.__module__, t.__name__): t for t in mappings.PROXIED_CLASSES
        }
        self._class_types_to_names.update(
            {v: k for k, v in self._proxied_types.items()}
        )

        # We will also proxy functions from objects as needed. This is useful
        # for defaultdict for example since the `default_factory` function is a
        # lambda that needs to be transferred.
        self._class_types_to_names[type(lambda x: x)] = "function"

        # Update all alias information
        for base_name, aliases in itertools.chain(
            a1.items(), a2.items(), a3.items(), a4.items()
        ):
            for alias in aliases:
                a = self._aliases.setdefault(alias, base_name)
                if a != base_name:
                    # Technically we could have a that aliases b and b that aliases c
                    # and then a that aliases c. This would error out in that case
                    # even though it is valid. It is easy for the user to get around
                    # this by listing aliases in the same order so we don't support
                    # it for now.
                    raise ValueError(
                        "%s is an alias to both %s and %s -- make sure all aliases "
                        "are listed in the same order" % (alias, base_name, a)
                    )

        # Detect circular aliases. If a user lists ("a", "b") and then ("b", "a"), we
        # will have an entry in aliases saying b is an alias for a and a is an alias
        # for b which is a recipe for disaster since we no longer have a cannonical name
        # for things.
        for alias, base_name in self._aliases.items():
            if base_name in self._aliases:
                raise ValueError(
                    "%s and %s are circular aliases -- make sure all aliases "
                    "are listed in the same order" % (alias, base_name)
                )

        # Determine if we have any overrides
        self._overrides = {}
        self._getattr_overrides = {}
        self._setattr_overrides = {}
        self._exception_serializers = {}
        for override in override_values:
            if isinstance(override, (RemoteAttrOverride, RemoteOverride)):
                for obj_name, obj_funcs in override.obj_mapping.items():
                    canonical_name = get_canonical_name(obj_name, self._aliases)
                    obj_type = self._known_classes.get(
                        canonical_name, self._proxied_types.get(obj_name)
                    )
                    if obj_type is None:
                        raise ValueError(
                            "%s does not refer to a proxied or exported type" % obj_name
                        )
                    if isinstance(override, RemoteOverride):
                        override_dict = self._overrides.setdefault(obj_type, {})
                    elif override.is_setattr:
                        override_dict = self._setattr_overrides.setdefault(obj_type, {})
                    else:
                        override_dict = self._getattr_overrides.setdefault(obj_type, {})
                    if isinstance(obj_funcs, str):
                        obj_funcs = (obj_funcs,)
                    for name in obj_funcs:
                        if name in override_dict:
                            raise ValueError(
                                "%s was already overridden for %s" % (name, obj_name)
                            )
                        override_dict[name] = override.func
            elif isinstance(override, RemoteExceptionSerializer):
                canonical_name = get_canonical_name(override.class_path, self._aliases)
                if canonical_name not in self._known_exceptions:
                    raise ValueError(
                        "%s does not refer to an exported exception"
                        % override.class_path
                    )
                if override.class_path in self._exception_serializers:
                    raise ValueError(
                        "%s exception serializer already defined" % override.class_path
                    )
                self._exception_serializers[canonical_name] = override.serializer

        # Process the exceptions making sure we have all the ones we need and building a
        # topologically sorted list for the client to instantiate
        name_to_parent_count = {}
        name_to_parents = {}
        parent_to_child = {}

        for ex_name, ex_cls in self._known_exceptions.items():
            ex_name_canonical = get_canonical_name(ex_name, self._aliases)
            parents = []
            for base in ex_cls.__mro__[1:]:
                if base is object:
                    raise ValueError(
                        "Exported exceptions not rooted in a builtin exception "
                        "are not supported: %s." % ex_name
                    )
                if base.__module__ == "builtins":
                    # We found our base exception
                    parents.append("builtins." + base.__name__)
                    break
                else:
                    fqn = ".".join([base.__module__, base.__name__])
                    canonical_fqn = get_canonical_name(fqn, self._aliases)
                    if canonical_fqn in self._known_exceptions:
                        parents.append(canonical_fqn)
                        children = parent_to_child.setdefault(canonical_fqn, [])
                        children.append(ex_name_canonical)
                    else:
                        raise ValueError(
                            "Exported exception %s has non exported and non builtin parent "
                            "exception: %s (%s). Known exceptions: %s."
                            % (ex_name, fqn, canonical_fqn, str(self._known_exceptions))
                        )
            name_to_parent_count[ex_name_canonical] = len(parents) - 1
            name_to_parents[ex_name_canonical] = parents

        # We now form the exceptions and put them in self._known_exceptions in
        # the proper order (topologically)
        self._known_exceptions = []
        # Find roots
        to_process = []
        for name, count in name_to_parent_count.items():
            if count == 0:
                to_process.append(name)

        # Topologically process the exceptions
        while to_process:
            next_round = []
            for name in to_process:
                self._known_exceptions.append((name, name_to_parents[name]))
                del name_to_parent_count[name]
                for child in parent_to_child.get(name, []):
                    cur_child_count = name_to_parent_count[child]
                    if cur_child_count == 1:
                        next_round.append(child)
                    else:
                        name_to_parent_count[child] = cur_child_count - 1
            to_process = next_round

        if name_to_parent_count:
            raise ValueError(
                "Badly rooted exceptions: %s" % ", ".join(name_to_parent_count.keys())
            )
        self._active = False
        self._channel = None
        self._datatransferer = DataTransferer(self)

        self._handlers = {
            OP_GETATTR: self._handle_getattr,
            OP_SETATTR: self._handle_setattr,
            OP_DELATTR: self._handle_delattr,
            OP_CALL: self._handle_call,
            OP_CALLATTR: self._handle_callattr,
            OP_REPR: self._handle_repr,
            OP_STR: self._handle_str,
            OP_HASH: self._handle_hash,
            OP_PICKLE: self._handle_pickle,
            OP_DEL: self._handle_del,
            OP_GETMETHODS: self._handle_getmethods,
            OP_DIR: self._handle_dir,
            OP_CALLFUNC: self._handle_callfunc,
            OP_CALLONCLASS: self._handle_callonclass,
            OP_GETVAL: self._handle_getval,
            OP_SETVAL: self._handle_setval,
            OP_INIT: self._handle_init,
            OP_SUBCLASSCHECK: self._handle_subclasscheck,
        }

        self._local_objects = {}