def _init_logs()

in smallpond/execution/task.py [0:0]


    def _init_logs(self, exec_id: str, capture_stdout_stderr: bool = False) -> None:
        log_rotation = {"rotation": "100 MB", "retention": 5} if not self.disable_log_rotation else {}
        log_file_paths = [os.path.join(self.log_root, f"{exec_id}.log")]
        user_log_only = {"": self.file_log_level, "smallpond": False}
        user_log_path = os.path.join(self.log_root, f"{exec_id}-user.log")
        # create shared log dir
        if self.shared_log_root is not None:
            os.makedirs(self.shared_log_root, exist_ok=True)
            shared_log_path = os.path.join(self.shared_log_root, f"{exec_id}.log")
            log_file_paths.append(shared_log_path)
        # remove existing handlers
        logger.remove()
        # register stdout log handler
        format_str = f"[{{time:%Y-%m-%d %H:%M:%S.%f}}] [{exec_id}] [{{process.name}}({{process.id}})] [{{file}}:{{line}}] {{level}} {{message}}"
        logger.add(
            sys.stdout,
            format=format_str,
            colorize=False,
            enqueue=True,
            backtrace=False,
            level=self.console_log_level,
        )
        # register file log handlers
        for log_path in log_file_paths:
            logger.add(
                log_path,
                format=format_str,
                colorize=False,
                enqueue=True,
                backtrace=False,
                level=self.file_log_level,
                **log_rotation,
            )
            logger.info(f"initialized logging to file: {log_path}")
        # register user log handler
        logger.add(
            user_log_path,
            format=format_str,
            colorize=False,
            enqueue=True,
            backtrace=False,
            level=self.file_log_level,
            filter=user_log_only,
            **log_rotation,
        )
        logger.info(f"initialized user logging to file: {user_log_path}")
        # intercept messages from logging
        logging.basicConfig(handlers=[InterceptHandler()], level=logging.INFO, force=True)
        # capture stdout as INFO level
        # https://loguru.readthedocs.io/en/stable/resources/recipes.html#capturing-standard-stdout-stderr-and-warnings
        if capture_stdout_stderr:

            class StreamToLogger(io.TextIOBase):
                def __init__(self, level="INFO"):
                    super().__init__()
                    self._level = level

                def write(self, buffer):
                    for line in buffer.rstrip().splitlines():
                        logger.opt(depth=1).log(self._level, line.rstrip())

                def flush(self):
                    pass

            sys.stdout = StreamToLogger()
            sys.stderr = StreamToLogger()