in metaflow/plugins/env_escape/server.py [0:0]
def __init__(self, config_dir, max_pickle_version):
self._max_pickle_version = data_transferer.defaultProtocol = max_pickle_version
try:
mappings = importlib.import_module(".server_mappings", package=config_dir)
except Exception as e:
raise RuntimeError(
"Cannot import server_mappings from '%s': %s" % (sys.path[0], str(e))
)
try:
# Import module as a relative package to make sure that it is consistent
# with how the client does it -- this enables us to do the same type of
# relative imports in overrides
override_module = importlib.import_module(".overrides", package=config_dir)
override_values = override_module.__dict__.values()
except ImportError:
# We ignore so the file can be non-existent if not needed
override_values = []
except Exception as e:
raise RuntimeError(
"Cannot import overrides from '%s': %s" % (sys.path[0], str(e))
)
self._aliases = {}
self._known_classes, a1 = self._flatten_dict(mappings.EXPORTED_CLASSES)
self._class_types_to_names = {v: k for k, v in self._known_classes.items()}
self._known_funcs, a2 = self._flatten_dict(mappings.EXPORTED_FUNCTIONS)
self._known_vals, a3 = self._flatten_dict(mappings.EXPORTED_VALUES)
self._known_exceptions, a4 = self._flatten_dict(mappings.EXPORTED_EXCEPTIONS)
self._proxied_types = {
"%s.%s" % (t.__module__, t.__name__): t for t in mappings.PROXIED_CLASSES
}
self._class_types_to_names.update(
{v: k for k, v in self._proxied_types.items()}
)
# We will also proxy functions from objects as needed. This is useful
# for defaultdict for example since the `default_factory` function is a
# lambda that needs to be transferred.
self._class_types_to_names[type(lambda x: x)] = "function"
# Update all alias information
for base_name, aliases in itertools.chain(
a1.items(), a2.items(), a3.items(), a4.items()
):
for alias in aliases:
a = self._aliases.setdefault(alias, base_name)
if a != base_name:
# Technically we could have a that aliases b and b that aliases c
# and then a that aliases c. This would error out in that case
# even though it is valid. It is easy for the user to get around
# this by listing aliases in the same order so we don't support
# it for now.
raise ValueError(
"%s is an alias to both %s and %s -- make sure all aliases "
"are listed in the same order" % (alias, base_name, a)
)
# Detect circular aliases. If a user lists ("a", "b") and then ("b", "a"), we
# will have an entry in aliases saying b is an alias for a and a is an alias
# for b which is a recipe for disaster since we no longer have a cannonical name
# for things.
for alias, base_name in self._aliases.items():
if base_name in self._aliases:
raise ValueError(
"%s and %s are circular aliases -- make sure all aliases "
"are listed in the same order" % (alias, base_name)
)
# Determine if we have any overrides
self._overrides = {}
self._getattr_overrides = {}
self._setattr_overrides = {}
self._exception_serializers = {}
for override in override_values:
if isinstance(override, (RemoteAttrOverride, RemoteOverride)):
for obj_name, obj_funcs in override.obj_mapping.items():
canonical_name = get_canonical_name(obj_name, self._aliases)
obj_type = self._known_classes.get(
canonical_name, self._proxied_types.get(obj_name)
)
if obj_type is None:
raise ValueError(
"%s does not refer to a proxied or exported type" % obj_name
)
if isinstance(override, RemoteOverride):
override_dict = self._overrides.setdefault(obj_type, {})
elif override.is_setattr:
override_dict = self._setattr_overrides.setdefault(obj_type, {})
else:
override_dict = self._getattr_overrides.setdefault(obj_type, {})
if isinstance(obj_funcs, str):
obj_funcs = (obj_funcs,)
for name in obj_funcs:
if name in override_dict:
raise ValueError(
"%s was already overridden for %s" % (name, obj_name)
)
override_dict[name] = override.func
elif isinstance(override, RemoteExceptionSerializer):
canonical_name = get_canonical_name(override.class_path, self._aliases)
if canonical_name not in self._known_exceptions:
raise ValueError(
"%s does not refer to an exported exception"
% override.class_path
)
if override.class_path in self._exception_serializers:
raise ValueError(
"%s exception serializer already defined" % override.class_path
)
self._exception_serializers[canonical_name] = override.serializer
# Process the exceptions making sure we have all the ones we need and building a
# topologically sorted list for the client to instantiate
name_to_parent_count = {}
name_to_parents = {}
parent_to_child = {}
for ex_name, ex_cls in self._known_exceptions.items():
ex_name_canonical = get_canonical_name(ex_name, self._aliases)
parents = []
for base in ex_cls.__mro__[1:]:
if base is object:
raise ValueError(
"Exported exceptions not rooted in a builtin exception "
"are not supported: %s." % ex_name
)
if base.__module__ == "builtins":
# We found our base exception
parents.append("builtins." + base.__name__)
break
else:
fqn = ".".join([base.__module__, base.__name__])
canonical_fqn = get_canonical_name(fqn, self._aliases)
if canonical_fqn in self._known_exceptions:
parents.append(canonical_fqn)
children = parent_to_child.setdefault(canonical_fqn, [])
children.append(ex_name_canonical)
else:
raise ValueError(
"Exported exception %s has non exported and non builtin parent "
"exception: %s (%s). Known exceptions: %s."
% (ex_name, fqn, canonical_fqn, str(self._known_exceptions))
)
name_to_parent_count[ex_name_canonical] = len(parents) - 1
name_to_parents[ex_name_canonical] = parents
# We now form the exceptions and put them in self._known_exceptions in
# the proper order (topologically)
self._known_exceptions = []
# Find roots
to_process = []
for name, count in name_to_parent_count.items():
if count == 0:
to_process.append(name)
# Topologically process the exceptions
while to_process:
next_round = []
for name in to_process:
self._known_exceptions.append((name, name_to_parents[name]))
del name_to_parent_count[name]
for child in parent_to_child.get(name, []):
cur_child_count = name_to_parent_count[child]
if cur_child_count == 1:
next_round.append(child)
else:
name_to_parent_count[child] = cur_child_count - 1
to_process = next_round
if name_to_parent_count:
raise ValueError(
"Badly rooted exceptions: %s" % ", ".join(name_to_parent_count.keys())
)
self._active = False
self._channel = None
self._datatransferer = DataTransferer(self)
self._handlers = {
OP_GETATTR: self._handle_getattr,
OP_SETATTR: self._handle_setattr,
OP_DELATTR: self._handle_delattr,
OP_CALL: self._handle_call,
OP_CALLATTR: self._handle_callattr,
OP_REPR: self._handle_repr,
OP_STR: self._handle_str,
OP_HASH: self._handle_hash,
OP_PICKLE: self._handle_pickle,
OP_DEL: self._handle_del,
OP_GETMETHODS: self._handle_getmethods,
OP_DIR: self._handle_dir,
OP_CALLFUNC: self._handle_callfunc,
OP_CALLONCLASS: self._handle_callonclass,
OP_GETVAL: self._handle_getval,
OP_SETVAL: self._handle_setval,
OP_INIT: self._handle_init,
OP_SUBCLASSCHECK: self._handle_subclasscheck,
}
self._local_objects = {}