in core/__init__.py [0:0]
def run(self, config, state, dry_run, logger, exit_stack):
from inspect import signature
sync_logger_config(self.logger, config)
params = signature(self.func).parameters
if not dry_run:
logger.debug(f"executing pipe '{self.name}'...")
elif "dry_run" in params:
logger.debug(f"dry executing pipe '{self.name}'...")
else:
logger.debug(f"not executing pipe '{self.name}'...")
with ExitStack() as stack:
kwargs = {}
for name, param in params.items():
if name == "dry_run":
kwargs["dry_run"] = dry_run
continue
if isinstance(param.annotation, type):
if issubclass(param.annotation, Pipe):
kwargs[name] = self
elif issubclass(param.annotation, logging.Logger):
kwargs[name] = self.logger
elif issubclass(param.annotation, ExitStack):
kwargs[name] = exit_stack
elif issubclass(param.annotation, Pipe.Context):
kwargs[name] = param.annotation.bind(stack, config, state, logger)
continue
args = get_args(param.annotation)
for ann in args:
if isinstance(ann, Pipe.Node):
param = Pipe.Node.Param(name, args[0], param.default, param.empty)
_, getter, _ = ann.handle_param(param, config, state, logger)
try:
kwargs[name] = getter(None)
except KeyError as e:
raise Error(e.args[0])
if not dry_run or "dry_run" in kwargs:
return self.func(**kwargs)