in src/hpc/autoscale/job/demandprinter.py [0:0]
def wrap_text_io(clz: Any) -> Callable[[TextIO, Optional[str]], TextIO]:
members: Dict[str, Any] = {}
for attr in dir(TextIO):
if not attr[0].islower() and attr not in [
"__enter__",
"__exit__",
"__iter__",
"__next__",
]:
continue
if attr in dir(clz):
continue
def make_member(mem_name: str) -> Any:
is_function = inspect.isfunction(getattr(TextIO, mem_name))
if is_function:
return lambda *args: getattr(args[0].wrapped, mem_name)(*args[1:])
else:
return property(lambda *args: getattr(args[0].wrapped, mem_name))
members[attr] = make_member(attr)
return type("LoggingStream", (clz,), members)