def inner_init()

in metaflow/plugins/env_escape/client.py [0:0]


    def inner_init(self, python_executable, pythonpath, max_pickle_version, config_dir):
        # Make sure to init these variables (used in __del__) early on in case we
        # have an exception
        self._poller = None
        self._poller_lock = threading.Lock()
        self._active_pid = os.getpid()
        self._server_process = None
        self._socket_path = None

        data_transferer.defaultProtocol = max_pickle_version

        self._config_dir = config_dir
        server_path, server_config = os.path.split(config_dir)
        # The client launches the server when created; we use
        # Unix sockets for now
        server_module = ".".join([__package__, "server"])
        self._socket_path = "/tmp/%s_%d" % (server_config, self._active_pid)
        if os.path.exists(self._socket_path):
            raise RuntimeError("Existing socket: %s" % self._socket_path)
        env = os.environ.copy()
        env["PYTHONPATH"] = pythonpath
        # When coming from a conda environment, LD_LIBRARY_PATH may be set to
        # first include the Conda environment's library. When breaking out to
        # the underlying python, we need to reset it to the original LD_LIBRARY_PATH
        ld_lib_path = env.get("LD_LIBRARY_PATH")
        orig_ld_lib_path = env.get("MF_ORIG_LD_LIBRARY_PATH")
        if ld_lib_path is not None and orig_ld_lib_path is not None:
            env["LD_LIBRARY_PATH"] = orig_ld_lib_path
            if orig_ld_lib_path is not None:
                del env["MF_ORIG_LD_LIBRARY_PATH"]
        self._server_process = Popen(
            [
                python_executable,
                "-u",
                "-m",
                server_module,
                str(max_pickle_version),
                server_config,
                self._socket_path,
            ],
            cwd=server_path,
            env=env,
            stdout=PIPE,
            stderr=PIPE,
            bufsize=1,
            universal_newlines=True,  # Forces text as well
        )

        # Read override configuration
        # We can't just import the "overrides" module because that does not
        # distinguish it from other modules named "overrides" (either a third party
        # lib -- there is one -- or just other escaped modules). We therefore load
        # a fuller path to distinguish them from one another.
        # This is a bit tricky though:
        #  - it requires all `configurations` directories to NOT have a __init__.py
        #    so that configurations can be loaded through extensions too. If this is
        #    not the case, we will have a `configurations` module that will be registered
        #    and not be a proper namespace package
        #  - we want to import a specific file so we:
        #    - set a prefix that is specific enough to NOT include anything outside
        #      of the configuration so we end the prefix with "env_escape" (we know
        #      that is in the path of all configurations). We could technically go
        #      up to metaflow or metaflow_extensions BUT this then causes issues with
        #      the extension mechanism and _resolve_relative_path in plugins (because
        #      there are files loaded from plugins that refer to something outside of
        #      plugins and if we load plugins and NOT metaflow.plugins, this breaks).
        #    - set the package root from this prefix to everything up to overrides
        #    - load the overrides file
        #
        # This way, we are sure that we are:
        #  - loading this specific overrides
        #  - not adding extra stuff to the prefix that we don't care about
        #  - able to support configurations in both metaflow and extensions at the
        #    same time
        pkg_components = []
        prefix, last_basename = os.path.split(config_dir)
        while True:
            pkg_components.append(last_basename)
            possible_prefix, last_basename = os.path.split(prefix)
            if last_basename == "env_escape":
                break
            prefix = possible_prefix

        try:
            sys.path.insert(0, prefix)
            override_module = importlib.import_module(
                ".overrides", package=".".join(reversed(pkg_components))
            )
            override_values = override_module.__dict__.values()
        except ImportError:
            # We ignore so the file can be non-existent if not needed
            override_values = []
        except Exception as e:
            raise RuntimeError(
                "Cannot import overrides from '%s': %s" % (sys.path[0], str(e))
            )
        finally:
            sys.path = sys.path[1:]

        self._proxied_objects = {}

        # Wait for the socket to be up on the other side; we also check if the
        # server had issues starting up in which case we report that and crash out
        while not os.path.exists(self._socket_path):
            returncode = self._server_process.poll()
            if returncode is not None:
                raise RuntimeError(
                    "Server did not properly start: %s"
                    % self._server_process.stderr.read(),
                )
            time.sleep(1)
        # Open up the channel and set up the datatransferer pipeline
        self._channel = Channel(SocketByteStream.unixconnect(self._socket_path))
        self._datatransferer = DataTransferer(self)

        # Make PIPEs non-blocking; this is helpful to be able to
        # order the messages properly
        for f in (self._server_process.stdout, self._server_process.stderr):
            fl = fcntl.fcntl(f, fcntl.F_GETFL)
            fcntl.fcntl(f, fcntl.F_SETFL, fl | os.O_NONBLOCK)

        # Set up poller
        with self._poller_lock:
            self._poller = select.poll()
            self._poller.register(self._server_process.stdout, select.POLLIN)
            self._poller.register(self._server_process.stderr, select.POLLIN)
            self._poller.register(self._channel, select.POLLIN | select.POLLHUP)

        # Get all exports that we are proxying
        response = self._communicate(
            {FIELD_MSGTYPE: MSG_CONTROL, FIELD_OPTYPE: CONTROL_GETEXPORTS}
        )

        self._proxied_classes = {
            k: None
            for k in itertools.chain(
                response[FIELD_CONTENT]["classes"],
                response[FIELD_CONTENT]["proxied"],
                (e[0] for e in response[FIELD_CONTENT]["exceptions"]),
            )
        }

        self._exception_hierarchy = dict(response[FIELD_CONTENT]["exceptions"])
        self._proxied_classnames = set(response[FIELD_CONTENT]["classes"]).union(
            response[FIELD_CONTENT]["proxied"]
        )
        self._aliases = response[FIELD_CONTENT]["aliases"]

        # Determine all overrides
        self._overrides = {}
        self._getattr_overrides = {}
        self._setattr_overrides = {}
        self._exception_deserializers = {}
        for override in override_values:
            if isinstance(override, (LocalOverride, LocalAttrOverride)):
                for obj_name, obj_funcs in override.obj_mapping.items():
                    canonical_name = get_canonical_name(obj_name, self._aliases)
                    if canonical_name not in self._proxied_classes:
                        raise ValueError(
                            "%s does not refer to a proxied or override type" % obj_name
                        )
                    if isinstance(override, LocalOverride):
                        override_dict = self._overrides.setdefault(canonical_name, {})
                    elif override.is_setattr:
                        override_dict = self._setattr_overrides.setdefault(
                            canonical_name, {}
                        )
                    else:
                        override_dict = self._getattr_overrides.setdefault(
                            canonical_name, {}
                        )
                    if isinstance(obj_funcs, str):
                        obj_funcs = (obj_funcs,)
                    for name in obj_funcs:
                        if name in override_dict:
                            raise ValueError(
                                "%s was already overridden for %s" % (name, obj_name)
                            )
                        override_dict[name] = override.func
            if isinstance(override, LocalExceptionDeserializer):
                canonical_name = get_canonical_name(override.class_path, self._aliases)
                if canonical_name not in self._exception_hierarchy:
                    raise ValueError(
                        "%s does not refer to an exception type" % override.class_path
                    )
                cur_des = self._exception_deserializers.get(canonical_name, None)
                if cur_des is not None:
                    raise ValueError(
                        "Exception %s has multiple deserializers" % override.class_path
                    )
                self._exception_deserializers[canonical_name] = override.deserializer

        # Proxied standalone functions are functions that are proxied
        # as part of other objects like defaultdict for which we create a
        # on-the-fly simple class that is just a callable. This is therefore
        # a special type of self._proxied_classes
        self._proxied_standalone_functions = {}

        self._export_info = {
            "classes": response[FIELD_CONTENT]["classes"],
            "functions": response[FIELD_CONTENT]["functions"],
            "values": response[FIELD_CONTENT]["values"],
            "exceptions": response[FIELD_CONTENT]["exceptions"],
            "aliases": response[FIELD_CONTENT]["aliases"],
        }