in migration/src/common.py [0:0]
def log_listener(log_dir: Path, name: str) -> tuple[multiprocessing.Process, multiprocessing.Queue]:
def listener_process(queue: multiprocessing.Queue, path: Path):
file_handler = logging.FileHandler(path)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(LOGGING_FOMATTER)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(LOGGING_FOMATTER)
root = logging.getLogger()
root.addHandler(file_handler)
root.addHandler(console_handler)
while True:
try:
record: logging.LogRecord = queue.get()
if record is None: # sentinel
break
logger = logging.getLogger(record.name)
logger.handle(record)
except Exception:
import sys, traceback
print('Whoops! Problem:', file=sys.stderr)
traceback.print_exc(file=sys.stderr)
if not log_dir.exists():
log_dir.mkdir()
path = log_dir.joinpath(f'{name}_{datetime.now().isoformat(timespec="seconds")}.log')
queue = multiprocessing.Queue(-1)
listener = multiprocessing.Process(target=listener_process, args=(queue, path))
return (listener, queue)