in core/__init__.py [0:0]
def bind(cls, stack, config, state, logger):
# define a new sub-type of the user's context, make it concrete or it cannot be instantiated
sub = type(cls.__name__, (cls,), {"handle_param": None, "logger": logger})
bindings = {}
for name, ann in cls.__annotations__.items():
if isinstance(ann, type):
if issubclass(ann, Pipe.Context):
nested = ann.bind(stack, config, state, logger)
setattr(sub, name, nested)
continue
args = get_args(ann)
for i, ann in enumerate(args):
if isinstance(ann, Pipe.Node):
default = getattr(cls, name, NoDefault)
param = Pipe.Node.Param(name, args[0], default, NoDefault)
binding, getter, setter = ann.handle_param(param, config, state, logger)
setattr(sub, name, property(getter, setter))
bindings[name] = binding
try:
getter(None)
except KeyError as e:
raise Error(e.args[0])
setattr(sub, "__pipe_ctx_bindings__", bindings)
return stack.enter_context(sub())