in airflow-core/src/airflow/plugins_manager.py [0:0]
def get_plugin_info(attrs_to_dump: Iterable[str] | None = None) -> list[dict[str, Any]]:
"""
Dump plugins attributes.
:param attrs_to_dump: A list of plugin attributes to dump
"""
ensure_plugins_loaded()
integrate_macros_plugins()
initialize_flask_plugins()
initialize_fastapi_plugins()
initialize_extra_operators_links_plugins()
if not attrs_to_dump:
attrs_to_dump = PLUGINS_ATTRIBUTES_TO_DUMP
plugins_info = []
if plugins:
for plugin in plugins:
info: dict[str, Any] = {"name": plugin.name}
for attr in attrs_to_dump:
if attr in ("global_operator_extra_links", "operator_extra_links"):
info[attr] = [f"<{qualname(d.__class__)} object>" for d in getattr(plugin, attr)]
elif attr in ("macros", "timetables", "priority_weight_strategies"):
info[attr] = [qualname(d) for d in getattr(plugin, attr)]
elif attr == "listeners":
# listeners may be modules or class instances
info[attr] = [
d.__name__ if inspect.ismodule(d) else qualname(d) for d in getattr(plugin, attr)
]
elif attr == "appbuilder_views":
info[attr] = [
{**d, "view": qualname(d["view"].__class__) if "view" in d else None}
for d in getattr(plugin, attr)
]
elif attr == "flask_blueprints":
info[attr] = [
f"<{qualname(d.__class__)}: name={d.name!r} import_name={d.import_name!r}>"
for d in getattr(plugin, attr)
]
elif attr == "fastapi_apps":
info[attr] = [
{**d, "app": qualname(d["app"].__class__) if "app" in d else None}
for d in getattr(plugin, attr)
]
elif attr == "fastapi_root_middlewares":
# remove args and kwargs from plugin info to hide potentially sensitive info.
info[attr] = [
{
k: (v if k != "middleware" else qualname(middleware_dict["middleware"]))
for k, v in middleware_dict.items()
if k not in ("args", "kwargs")
}
for middleware_dict in getattr(plugin, attr)
]
else:
info[attr] = getattr(plugin, attr)
plugins_info.append(info)
return plugins_info