client/log/log.py (272 lines of code) (raw):
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import contextlib
import copy
import io
import logging
import logging.handlers
import re
import subprocess
import sys
import threading
import time
from pathlib import Path
from types import TracebackType
from typing import Generator, Iterable, Optional, Pattern, Sequence
import click
PERFORMANCE: int = 15
PROMPT: int = 50
SUCCESS: int = 60
LOG: logging.Logger = logging.getLogger(__name__)
stdout: io.StringIO = io.StringIO(newline="")
__handler: Optional["TimedStreamHandler"] = None
class Color:
YELLOW: str = "\033[33m"
RED: str = "\033[31m"
GREEN: str = "\033[32m"
class Format:
BOLD: str = "\033[1m"
CLEAR_LINE: str = "\x1b[0G\x1b[K"
CLEAR: str = "\033[0m"
TRUNCATE_OVERFLOW: str = "\033[?7l"
WRAP_OVERFLOW: str = "\033[?7h"
NEWLINE: str = "\n"
CURSOR_UP_LINE: str = "\x1b[1A"
HIDE_CURSOR: str = "\x1b[?25l"
SHOW_CURSOR: str = "\x1b[?25h"
class Character:
LAMBDA: str = "ƛ"
class SectionFormatter(logging.Formatter):
def __init__(self) -> None:
super(SectionFormatter, self).__init__(
"%(asctime)s [PID %(process)d] %(levelname)s %(message)s"
)
def format(self, record: logging.LogRecord) -> str:
formatted = super(SectionFormatter, self).format(record)
return re.sub(r"DEBUG \[(.*)\]", r"\1", formatted)
# pyre-fixme[24]: Generic type `logging.StreamHandler` expects 1 type parameter.
class TimedStreamHandler(logging.StreamHandler):
THRESHOLD: float = 0.5
LINE_BREAKING_LEVELS: Sequence[str] = ["ERROR", "WARNING", "SUCCESS"]
_terminate: bool = False
_last_update: float = 0.0
def __init__(self) -> None:
super(TimedStreamHandler, self).__init__()
self.setFormatter(logging.Formatter("%(message)s"))
self.terminator: str = ""
self.setLevel(logging.INFO)
self._record: Optional[logging.LogRecord] = None
self._active_lines: int = 0
# Preamble preparing terminal.
click.echo(
Format.NEWLINE
+ Format.CLEAR_LINE
+ Format.CURSOR_UP_LINE
+ Format.HIDE_CURSOR,
file=sys.stderr,
nl=False,
)
thread = threading.Thread(target=self._thread)
thread.daemon = True
thread.start()
def clear_lines(self) -> str:
if self._active_lines == 0:
return ""
return Format.CLEAR_LINE + "".join(
[
Format.CURSOR_UP_LINE + Format.CLEAR_LINE
for n in range(self._active_lines - 1)
]
)
def emit(self, record: logging.LogRecord, age: Optional[float] = None) -> None:
suffix = ""
color: Optional[str] = None
message = record.msg
active_lines = message.count("\n") + 1
truncate = Format.TRUNCATE_OVERFLOW
if record.levelname in self.LINE_BREAKING_LEVELS:
message += "\n"
if record.levelname == "ERROR":
color = "red"
self._record = None
active_lines = 0
truncate = Format.WRAP_OVERFLOW
elif record.levelname == "WARNING":
color = "yellow"
self._record = None
active_lines = 0
truncate = Format.WRAP_OVERFLOW
elif record.levelname == "PROMPT":
color = "yellow"
self._record = None
active_lines = 0
truncate = Format.WRAP_OVERFLOW
elif record.levelname == "SUCCESS":
self._record = None
active_lines = 0
truncate = Format.WRAP_OVERFLOW
elif age:
if age > 10:
color = "yellow"
if age > 30:
color = "red"
suffix = click.style(f" [{age:.1f}s]", fg=color)
else:
self._record = record
self._last_update = time.time()
prompt = click.style(f"{Character.LAMBDA}", fg=color)
new_message = f"{self.clear_lines()}{prompt} {truncate}{message}{suffix}"
timed_record = copy.copy(record)
timed_record.msg = (
f"{click.unstyle(new_message)}\n"
# pyre-ignore[16]: Missing typeshed stub for this API
if click.utils.should_strip_ansi(stream=sys.stderr)
else new_message
)
self._active_lines = active_lines
super(TimedStreamHandler, self).emit(timed_record)
def _thread(self) -> None:
while not self._terminate:
record = self._record
if record:
age = time.time() - self._last_update
if age > self.THRESHOLD:
self.emit(record, age)
time.sleep(0.1)
def terminate(self) -> None:
self._terminate = True
if self._active_lines > 0:
click.echo(self.clear_lines(), file=sys.stderr, nl=False)
self._active_lines = 0
# Reset terminal.
click.echo(Format.WRAP_OVERFLOW + Format.SHOW_CURSOR, file=sys.stderr, nl=False)
sys.stderr.flush()
def initialize(noninteractive: bool) -> None:
global __handler
if __handler:
LOG.debug("Log handler already exists, skipping initialization.")
return
if noninteractive:
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(SectionFormatter())
stream_handler.setLevel(logging.DEBUG)
__handler = None
else:
stream_handler = TimedStreamHandler()
__handler = stream_handler
logging.addLevelName(PERFORMANCE, "PERFORMANCE")
logging.addLevelName(PROMPT, "PROMPT")
logging.addLevelName(SUCCESS, "SUCCESS")
logging.basicConfig(level=logging.DEBUG, handlers=[stream_handler])
def enable_file_logging(log_file: Path) -> None:
handler = logging.handlers.RotatingFileHandler(
str(log_file),
mode="a",
# Keep at most 5 log files on disk
backupCount=4,
# Limit the size of each log file to 10MB
maxBytes=10 * 1000 * 1000,
)
handler.setFormatter(SectionFormatter())
handler.setLevel(logging.DEBUG)
logger = logging.getLogger()
logger.addHandler(handler)
def cleanup() -> None:
global __handler
handler = __handler
if handler:
handler.terminate()
__handler = None
output = stdout.getvalue()
if output:
click.echo(output, nl=False)
if not output.endswith("\n"):
click.echo()
@contextlib.contextmanager
def configured_logger(noninteractive: bool) -> Generator[None, None, None]:
try:
initialize(noninteractive)
yield
finally:
cleanup()
@contextlib.contextmanager
def file_tailer(file_path: Path) -> Generator[Iterable[str], None, None]:
"""
This function yields a stream of string generated by following the last
part of the given file. In other words, the returned stream behaves roughtly
the same as `tail -F`: If the file being watched is left untouched, invoking
`next` on the returned stream will block indefinitely. If the file being
watched gets a line appended to the end of it, invoking `next` on the returned
stream will return the appended line. Leaving the context manager will cause
the returned stream to stop iteration next time `next` is invoked.
This API is intended to be used along with `StreamLogger` to concurrently
forward the content of a log file to the terminal in the background:
```
with file_tailer(log_file) as log_stream:
with StreamLogger(log_stream) as logger:
# Main thread logic happens here
...
logger.join()
```
"""
with subprocess.Popen(
["tail", "-F", "-n", "0", str(file_path)],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
universal_newlines=True,
) as tail:
try:
stdout = tail.stdout
if stdout is None:
raise RuntimeError(
"subprocess.Popen failed to set up a pipe for stdout"
)
yield stdout
finally:
tail.terminate()
class StreamLogger:
_should_stop_reading_stream = False
_current_section: Optional[str]
_server_log_pattern: Pattern[str] = re.compile(
r"\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} (\w+)(.*)"
)
def __init__(self, stream: Iterable[str]) -> None:
self._reader = threading.Thread(target=self._read_stream, args=(stream,))
self._reader.daemon = True
self._current_section = None
def join(self) -> None:
self._reader.join()
def _log_server_stderr_message(self, server_message: str) -> None:
line = server_message.rstrip()
match = self._server_log_pattern.match(line)
if match:
section = match.groups()[0]
message = match.groups()[1]
self._current_section = section
else:
section = self._current_section
message = line
if section == "ERROR":
LOG.error(message)
elif section == "INFO":
LOG.info(message)
elif section == "DUMP":
LOG.warning(message)
elif section == "WARNING":
LOG.warning(message)
elif section == "PROGRESS":
LOG.info(message)
elif section == "PARSER":
LOG.error(message)
elif section is not None:
LOG.debug("[%s] %s", section, message)
else:
LOG.debug(line)
def _read_stream(self, stream: Iterable[str]) -> None:
try:
for line in stream:
if self._should_stop_reading_stream:
return
self._log_server_stderr_message(line)
except Exception:
pass
def __enter__(self) -> "StreamLogger":
self._should_stop_reading_stream = False
self._reader.start()
return self
def __exit__(
self,
_type: Optional[BaseException],
_value: Optional[BaseException],
_traceback: Optional[TracebackType],
) -> None:
self._should_stop_reading_stream = True
def get_yes_no_input(prompt: str) -> bool:
choice = get_input(prompt, suffix=" [Y/n] ")
return choice.lower() in ["", "y", "ye", "yes"]
def get_optional_input(prompt: str, default: str) -> str:
result = get_input(prompt, suffix=f" (Default: `{default}`): ")
if result == "":
return default
return result
def get_input(prompt: str, suffix: str = "") -> str:
LOG.log(PROMPT, prompt + suffix)
return input().strip()
def truncate(message: str, size: int) -> str:
if len(message) <= size:
return message
return f"{message[:size]}..[truncated {len(message) - size} characters]"