metaflow/extension_support/plugins.py (175 lines of code) (raw):
import importlib
import traceback
from metaflow.metaflow_config_funcs import from_conf
from . import _ext_debug, alias_submodules, get_modules, lazy_load_aliases
def process_plugins(module_globals):
_resolve_relative_paths(module_globals)
# Set ENABLED_ and _TOGGLE_ variables. The ENABLED_* variables are read from
# configuration and the _TOGGLE_* variables are initialized to empty lists to be
# appended to from the extensions.
for plugin_category in _plugin_categories:
upper_category = plugin_category.upper()
globals()["ENABLED_%s" % upper_category] = from_conf(
"ENABLED_%s" % upper_category
)
globals()["_TOGGLE_%s" % upper_category] = []
# Initialize the list of available plugins to what is available in Metaflow core
globals()[_list_for_category(plugin_category)] = _get_ext_plugins(
module_globals, plugin_category
)
try:
modules_to_import = get_modules("plugins")
# This is like multiload_all but we load globals independently since we just care
# about the TOGGLE and ENABLED values
for m in modules_to_import:
lazy_load_aliases(
alias_submodules(m.module, m.tl_package, "plugins", extra_indent=True)
)
for n, o in m.module.__dict__.items():
if n.startswith("TOGGLE_") and n[7:].lower() in _plugin_categories:
# Extensions append to the TOGGLE list
globals()["_TOGGLE_%s" % n[7:]].extend(o)
elif n.startswith("ENABLED_") and n[8:].lower() in _plugin_categories:
# Extensions override the ENABLED_ setting.
globals()[n] = o
_resolve_relative_paths(m.module.__dict__)
for plugin_category in _plugin_categories:
# Collect all the plugins present
globals()[_list_for_category(plugin_category)].extend(
_get_ext_plugins(m.module.__dict__, plugin_category)
)
except Exception as e:
_ext_debug("\tWARNING: ignoring all plugins due to error during import: %s" % e)
print(
"WARNING: Plugins did not load -- ignoring all of them which may not "
"be what you want: %s" % e
)
traceback.print_exc()
# At this point, we have _all_<category>s populated with all the tuples
# (name, module_class) from all the plugins in all the extensions (if any)
# We build a dictionary taking the latest presence for each name (so plugins
# override metaflow core)
for plugin_category in _plugin_categories:
upper_category = plugin_category.upper()
d = globals()[_dict_for_category(plugin_category)] = {}
for name, class_path in globals()["_all_%ss" % plugin_category]:
_ext_debug(
" Adding %s '%s' from '%s'" % (plugin_category, name, class_path)
)
d[name] = class_path
# Resolve all the ENABLED_* variables. The rules are the following:
# - if ENABLED_* is non None, it means it was either set directly by the user
# in a configuration file, on the command line or by an extension. In that case
# we honor those wishes and completely ignore the extensions' toggles.
# - if ENABLED_* is None, we populate it with everything included here and in
# all the extensions and use the TOGGLE_ list to produce the final list.
# The rationale behind this is to support both a configuration option where the
# plugins enabled are explicitly listed (typical in a lot of software) but also to
# support a "configuration-less" version where the installation of the extensions
# determines what is activated.
if globals()["ENABLED_%s" % upper_category] is None:
globals()["ENABLED_%s" % upper_category] = (
list(d) + globals()["_TOGGLE_%s" % upper_category]
)
def merge_lists(base, overrides, attr):
# Merge two lists of classes by comparing them for equality using 'attr'.
# This function prefers anything in 'overrides'. In other words, if a class
# is present in overrides and matches (according to the equality criterion) a class in
# base, it will be used instead of the one in base.
l = list(overrides)
existing = set([getattr(o, attr) for o in overrides])
l.extend([d for d in base if getattr(d, attr) not in existing])
base[:] = l[:]
def get_plugin(category, class_path, name):
path, cls_name = class_path.rsplit(".", 1)
try:
plugin_module = importlib.import_module(path)
except ImportError as e:
raise ValueError(
"Cannot locate %s plugin '%s' at '%s'" % (category, name, path)
) from e
cls = getattr(plugin_module, cls_name, None)
if cls is None:
raise ValueError(
"Cannot locate '%s' class for %s plugin at '%s'"
% (cls_name, category, path)
)
extracted_name = get_plugin_name(category, cls)
if extracted_name and extracted_name != name:
raise ValueError(
"Class '%s' at '%s' for %s plugin expected to be named '%s' but got '%s'"
% (cls_name, path, category, name, extracted_name)
)
globals()[cls_name] = cls
_ext_debug(" Added %s plugin '%s' from '%s'" % (category, name, class_path))
return cls
def resolve_plugins(category, path_only=False):
# Called to return a list of classes that are the available plugins for 'category'
# The ENABLED_<category> variable is set in process_plugins
# based on all the plugins that are found; it can contain either names of
# plugins or -/+<name_of_plugin> indicating a "toggle" to activate/de-activate
# a plugin.
list_of_plugins = globals()["ENABLED_%s" % category.upper()]
_ext_debug(" Resolving %s plugins" % category)
_ext_debug(" Raw list of plugins is: %s" % str(list_of_plugins))
set_of_plugins = set()
for p in list_of_plugins:
if p.startswith("-"):
set_of_plugins.discard(p[1:])
elif p.startswith("+"):
set_of_plugins.add(p[1:])
else:
set_of_plugins.add(p)
available_plugins = globals()[_dict_for_category(category)]
name_extractor = _plugin_categories[category]
if path_only or not name_extractor:
# If we have no name function, it means we just use the name in the dictionary
# and we return a dictionary. This is for sidecars mostly as they do not have
# a field that indicates their name
to_return = {}
else:
to_return = []
_ext_debug(" Resolved list of plugins is: %s" % str(set_of_plugins))
# Various error checks to make sure the plugin exists -- basically converts a string
# representing a class path to the actual class. We try to give useful messages
# in case of errors.
for name in set_of_plugins:
class_path = available_plugins.get(name, None)
if class_path is None:
raise ValueError(
"Configuration requested %s plugin '%s' but no such plugin is available"
% (category, name)
)
if path_only:
to_return[name] = class_path
else:
if name_extractor is not None:
to_return.append(get_plugin(category, class_path, name))
else:
to_return[name] = get_plugin(category, class_path, name)
return to_return
# Some plugins do not have a field in them indicating their name.
# This is the case for sidecars.
# All other plugins contain a field that indicates their name.
# _plugin_categories contains all the types of plugins and, for ones that have
# a field indicating their name,
# an additional function indicating how to extract the name of the plugin is provided.
# key is the type of plugin
# value is either:
# - a function to extract the name of the plugin from the plugin itself
# - None if this is a plugin with no field for its name
_plugin_categories = {
"step_decorator": lambda x: x.name,
"flow_decorator": lambda x: x.name,
"environment": lambda x: x.TYPE,
"metadata_provider": lambda x: x.TYPE,
"datastore": lambda x: x.TYPE,
"dataclient": lambda x: x.TYPE,
"secrets_provider": lambda x: x.TYPE,
"gcp_client_provider": lambda x: x.name,
"deployer_impl_provider": lambda x: x.TYPE,
"azure_client_provider": lambda x: x.name,
"sidecar": None,
"logging_sidecar": None,
"monitor_sidecar": None,
"aws_client_provider": lambda x: x.name,
"cli": lambda x: (
list(x.commands)[0] if len(x.commands) == 1 else "too many commands"
),
"runner_cli": lambda x: x.name,
"tl_plugin": None,
}
def get_plugin_name(category, plugin):
extractor = _plugin_categories[category]
if extractor:
return extractor(plugin)
return None
def _list_for_category(category):
# Convenience function to name the variable containing List[Tuple[str, str]] where
# each tuple contains:
# - the name of the plugin
# - the classpath of the plugin
return "_all_%ss" % category
def _dict_for_category(category):
# Convenience function to name the variable containing the same thing as
# _list_for_category except that it is now in dict form where the key is the name
# of the plugin
return "_all_%ss_dict" % category
def _get_ext_plugins(module_globals, category):
# Convenience function to get the list of Tuple[str, str] describing the plugins
# available from the extension. This defaults to [] so not all plugins need to be
# listed.
return module_globals.get("%sS_DESC" % category.upper(), [])
def _set_ext_plugins(module_globals, category, val):
module_globals["%sS_DESC" % category.upper()] = val
def _resolve_relative_paths(module_globals):
# We want to modify all the relevant lists so that the relative paths
# are made fully qualified paths for the modules
pkg_path = module_globals["__package__"]
pkg_components = pkg_path.split(".")
def resolve_path(class_path):
# Converts a relative class_path to an absolute one considering that the
# relative class_path is present in a package pkg_path
if class_path[0] == ".":
i = 1
# Check for multiple "." at the start of the class_path
while class_path[i] == ".":
i += 1
if i > len(pkg_components):
raise ValueError(
"Path '%s' exits out of Metaflow module at %s"
% (class_path, pkg_path)
)
return (
".".join(pkg_components[: -i + 1] if i > 1 else pkg_components)
+ class_path[i - 1 :]
)
return class_path
for plugin_category in _plugin_categories:
_set_ext_plugins(
module_globals,
plugin_category,
list(
map(
lambda p: (p[0], resolve_path(p[1])),
_get_ext_plugins(module_globals, plugin_category),
)
),
)