in metaflow/plugins/env_escape/client.py [0:0]
def inner_init(self, python_executable, pythonpath, max_pickle_version, config_dir):
# Make sure to init these variables (used in __del__) early on in case we
# have an exception
self._poller = None
self._poller_lock = threading.Lock()
self._active_pid = os.getpid()
self._server_process = None
self._socket_path = None
data_transferer.defaultProtocol = max_pickle_version
self._config_dir = config_dir
server_path, server_config = os.path.split(config_dir)
# The client launches the server when created; we use
# Unix sockets for now
server_module = ".".join([__package__, "server"])
self._socket_path = "/tmp/%s_%d" % (server_config, self._active_pid)
if os.path.exists(self._socket_path):
raise RuntimeError("Existing socket: %s" % self._socket_path)
env = os.environ.copy()
env["PYTHONPATH"] = pythonpath
# When coming from a conda environment, LD_LIBRARY_PATH may be set to
# first include the Conda environment's library. When breaking out to
# the underlying python, we need to reset it to the original LD_LIBRARY_PATH
ld_lib_path = env.get("LD_LIBRARY_PATH")
orig_ld_lib_path = env.get("MF_ORIG_LD_LIBRARY_PATH")
if ld_lib_path is not None and orig_ld_lib_path is not None:
env["LD_LIBRARY_PATH"] = orig_ld_lib_path
if orig_ld_lib_path is not None:
del env["MF_ORIG_LD_LIBRARY_PATH"]
self._server_process = Popen(
[
python_executable,
"-u",
"-m",
server_module,
str(max_pickle_version),
server_config,
self._socket_path,
],
cwd=server_path,
env=env,
stdout=PIPE,
stderr=PIPE,
bufsize=1,
universal_newlines=True, # Forces text as well
)
# Read override configuration
# We can't just import the "overrides" module because that does not
# distinguish it from other modules named "overrides" (either a third party
# lib -- there is one -- or just other escaped modules). We therefore load
# a fuller path to distinguish them from one another.
# This is a bit tricky though:
# - it requires all `configurations` directories to NOT have a __init__.py
# so that configurations can be loaded through extensions too. If this is
# not the case, we will have a `configurations` module that will be registered
# and not be a proper namespace package
# - we want to import a specific file so we:
# - set a prefix that is specific enough to NOT include anything outside
# of the configuration so we end the prefix with "env_escape" (we know
# that is in the path of all configurations). We could technically go
# up to metaflow or metaflow_extensions BUT this then causes issues with
# the extension mechanism and _resolve_relative_path in plugins (because
# there are files loaded from plugins that refer to something outside of
# plugins and if we load plugins and NOT metaflow.plugins, this breaks).
# - set the package root from this prefix to everything up to overrides
# - load the overrides file
#
# This way, we are sure that we are:
# - loading this specific overrides
# - not adding extra stuff to the prefix that we don't care about
# - able to support configurations in both metaflow and extensions at the
# same time
pkg_components = []
prefix, last_basename = os.path.split(config_dir)
while True:
pkg_components.append(last_basename)
possible_prefix, last_basename = os.path.split(prefix)
if last_basename == "env_escape":
break
prefix = possible_prefix
try:
sys.path.insert(0, prefix)
override_module = importlib.import_module(
".overrides", package=".".join(reversed(pkg_components))
)
override_values = override_module.__dict__.values()
except ImportError:
# We ignore so the file can be non-existent if not needed
override_values = []
except Exception as e:
raise RuntimeError(
"Cannot import overrides from '%s': %s" % (sys.path[0], str(e))
)
finally:
sys.path = sys.path[1:]
self._proxied_objects = {}
# Wait for the socket to be up on the other side; we also check if the
# server had issues starting up in which case we report that and crash out
while not os.path.exists(self._socket_path):
returncode = self._server_process.poll()
if returncode is not None:
raise RuntimeError(
"Server did not properly start: %s"
% self._server_process.stderr.read(),
)
time.sleep(1)
# Open up the channel and set up the datatransferer pipeline
self._channel = Channel(SocketByteStream.unixconnect(self._socket_path))
self._datatransferer = DataTransferer(self)
# Make PIPEs non-blocking; this is helpful to be able to
# order the messages properly
for f in (self._server_process.stdout, self._server_process.stderr):
fl = fcntl.fcntl(f, fcntl.F_GETFL)
fcntl.fcntl(f, fcntl.F_SETFL, fl | os.O_NONBLOCK)
# Set up poller
with self._poller_lock:
self._poller = select.poll()
self._poller.register(self._server_process.stdout, select.POLLIN)
self._poller.register(self._server_process.stderr, select.POLLIN)
self._poller.register(self._channel, select.POLLIN | select.POLLHUP)
# Get all exports that we are proxying
response = self._communicate(
{FIELD_MSGTYPE: MSG_CONTROL, FIELD_OPTYPE: CONTROL_GETEXPORTS}
)
self._proxied_classes = {
k: None
for k in itertools.chain(
response[FIELD_CONTENT]["classes"],
response[FIELD_CONTENT]["proxied"],
(e[0] for e in response[FIELD_CONTENT]["exceptions"]),
)
}
self._exception_hierarchy = dict(response[FIELD_CONTENT]["exceptions"])
self._proxied_classnames = set(response[FIELD_CONTENT]["classes"]).union(
response[FIELD_CONTENT]["proxied"]
)
self._aliases = response[FIELD_CONTENT]["aliases"]
# Determine all overrides
self._overrides = {}
self._getattr_overrides = {}
self._setattr_overrides = {}
self._exception_deserializers = {}
for override in override_values:
if isinstance(override, (LocalOverride, LocalAttrOverride)):
for obj_name, obj_funcs in override.obj_mapping.items():
canonical_name = get_canonical_name(obj_name, self._aliases)
if canonical_name not in self._proxied_classes:
raise ValueError(
"%s does not refer to a proxied or override type" % obj_name
)
if isinstance(override, LocalOverride):
override_dict = self._overrides.setdefault(canonical_name, {})
elif override.is_setattr:
override_dict = self._setattr_overrides.setdefault(
canonical_name, {}
)
else:
override_dict = self._getattr_overrides.setdefault(
canonical_name, {}
)
if isinstance(obj_funcs, str):
obj_funcs = (obj_funcs,)
for name in obj_funcs:
if name in override_dict:
raise ValueError(
"%s was already overridden for %s" % (name, obj_name)
)
override_dict[name] = override.func
if isinstance(override, LocalExceptionDeserializer):
canonical_name = get_canonical_name(override.class_path, self._aliases)
if canonical_name not in self._exception_hierarchy:
raise ValueError(
"%s does not refer to an exception type" % override.class_path
)
cur_des = self._exception_deserializers.get(canonical_name, None)
if cur_des is not None:
raise ValueError(
"Exception %s has multiple deserializers" % override.class_path
)
self._exception_deserializers[canonical_name] = override.deserializer
# Proxied standalone functions are functions that are proxied
# as part of other objects like defaultdict for which we create a
# on-the-fly simple class that is just a callable. This is therefore
# a special type of self._proxied_classes
self._proxied_standalone_functions = {}
self._export_info = {
"classes": response[FIELD_CONTENT]["classes"],
"functions": response[FIELD_CONTENT]["functions"],
"values": response[FIELD_CONTENT]["values"],
"exceptions": response[FIELD_CONTENT]["exceptions"],
"aliases": response[FIELD_CONTENT]["aliases"],
}