in muss/utils/helpers.py [0:0]
def redirect_streams(source_streams, target_streams):
# We assign these functions before hand in case a target stream is also a source stream.
# If it's the case then the write function would be patched leading to infinie recursion
target_writes = [target_stream.write for target_stream in target_streams]
target_flushes = [target_stream.flush for target_stream in target_streams]
def patched_write(self, message):
for target_write in target_writes:
target_write(message)
def patched_flush(self):
for target_flush in target_flushes:
target_flush()
original_source_stream_writes = [source_stream.write for source_stream in source_streams]
original_source_stream_flushes = [source_stream.flush for source_stream in source_streams]
try:
for source_stream in source_streams:
source_stream.write = MethodType(patched_write, source_stream)
source_stream.flush = MethodType(patched_flush, source_stream)
yield
finally:
for source_stream, original_source_stream_write, original_source_stream_flush in zip(
source_streams, original_source_stream_writes, original_source_stream_flushes
):
source_stream.write = original_source_stream_write
source_stream.flush = original_source_stream_flush