client/commands/rage.py (183 lines of code) (raw):
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import dataclasses
import datetime
import itertools
import json
import logging
import shutil
import subprocess
from pathlib import Path
from typing import Optional, TextIO, Sequence, List, Tuple
from .. import (
command_arguments,
configuration as configuration_module,
log,
version,
)
from . import commands, start, remote_logging
LOG: logging.Logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class Section:
name: str
content: str
def _print_section(section: Section, output: TextIO) -> None:
print(f"{section.name}", file=output)
separator = "=" * len(section.name)
print(f"{separator}", file=output)
print(section.content, file=output)
print("", file=output, flush=True)
def _version_section(configuration: configuration_module.Configuration) -> Section:
client_version_line = f"Client version: {version.__version__}"
try:
binary_version = configuration.get_binary_version()
binary_version_line = f"Binary version: {binary_version}"
except Exception as error:
binary_version_line = f"Could not determine binary version: {error}"
return Section(
name="Versions", content="\n".join([client_version_line, binary_version_line])
)
def _configuration_section(
configuration: configuration_module.Configuration,
) -> Section:
return Section(
name="Configuration", content=json.dumps(configuration.to_json(), indent=2)
)
def _get_subprocess_stdout(command: Sequence[str]) -> Optional[str]:
result = subprocess.run(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
)
if result.returncode != 0:
return None
return result.stdout
def _get_file_content(path: Path) -> Optional[str]:
try:
return path.read_text()
except Exception:
return None
def _mercurial_section(
mercurial: str, name: str, additional_flags: Optional[Sequence[str]] = None
) -> Optional[Section]:
output = _get_subprocess_stdout(
[mercurial, name] + ([] if additional_flags is None else list(additional_flags))
)
return (
None
if output is None
else Section(name=f"Mercurial {name.capitalize()}", content=output)
)
def _watchman_section(watchman: str, name: str) -> Optional[Section]:
output = _get_subprocess_stdout([watchman, name])
return (
None
if output is None
else Section(name=f"Watchman {name.capitalize()}", content=output)
)
def _parse_log_file_name(name: str) -> Optional[datetime.datetime]:
try:
return datetime.datetime.strptime(name, start.SERVER_LOG_FILE_FORMAT)
except ValueError:
return None
def _get_server_log_timestamp_and_paths(
log_directory: Path,
) -> List[Tuple[datetime.datetime, Path]]:
try:
return sorted(
(
(timestamp, path)
for timestamp, path in (
(_parse_log_file_name(path.name), path)
for path in (log_directory / "new_server").iterdir()
if path.is_file()
)
if timestamp is not None
),
key=lambda pair: pair[0],
reverse=True,
)
except Exception:
return []
def _server_log_sections(
log_directory: Path, limit: Optional[int] = None
) -> List[Section]:
# Log files are sorted according to server start time: recently started servers
# will come first.
timestamp_and_paths = _get_server_log_timestamp_and_paths(log_directory)
sections: List[Section] = []
for timestamp, path in timestamp_and_paths:
if limit is not None and len(sections) >= limit:
break
content = _get_file_content(path)
if content is None:
continue
sections.append(Section(name=f"Server Log ({timestamp})", content=content))
return sections
def _client_log_section(log_directory: Path) -> Optional[Section]:
content = _get_file_content(log_directory / "pyre.stderr")
if content is None:
return None
return Section(name="Client Log", content=content)
def _print_configuration_sections(
configuration: configuration_module.Configuration, output: TextIO
) -> None:
LOG.info("Collecting information about Pyre configurations...")
_print_section(_version_section(configuration), output)
_print_section(_configuration_section(configuration), output)
def _print_mercurial_sections(output: TextIO) -> None:
LOG.info("Collecting information about mercurial...")
mercurial = shutil.which("hg")
if mercurial is not None:
for section in [
_mercurial_section(mercurial, "id"),
_mercurial_section(mercurial, "status"),
_mercurial_section(mercurial, "diff"),
_mercurial_section(
mercurial, "reflog", additional_flags=["--limit", "100"]
),
]:
if section is not None:
_print_section(section, output)
def _print_watchman_sections(output: TextIO) -> None:
LOG.info("Collecting information about watchman...")
watchman = shutil.which("watchman")
if watchman is not None:
for section in [_watchman_section(watchman, "watch-list")]:
if section is not None:
_print_section(section, output)
def _print_log_file_sections(
log_directory: Path, server_log_count: Optional[int], output: TextIO
) -> None:
LOG.info("Collecting information from Pyre's log files...")
for section in itertools.chain(
_server_log_sections(log_directory, limit=server_log_count),
[
_client_log_section(log_directory),
],
):
if section is not None:
_print_section(section, output)
def run_rage(
configuration: configuration_module.Configuration,
arguments: command_arguments.RageArguments,
output: TextIO,
) -> None:
_print_configuration_sections(configuration, output)
_print_mercurial_sections(output)
_print_watchman_sections(output)
_print_log_file_sections(
Path(configuration.log_directory), arguments.server_log_count, output
)
LOG.info("Done\n")
@remote_logging.log_usage(command_name="rage")
def run(
configuration: configuration_module.Configuration,
arguments: command_arguments.RageArguments,
) -> commands.ExitCode:
try:
output_path = arguments.output
if output_path is None:
run_rage(configuration, arguments, log.stdout)
else:
with open(output_path) as output:
run_rage(configuration, arguments, output)
return commands.ExitCode.SUCCESS
except Exception as error:
raise commands.ClientException(
f"Exception occurred during rage generation: {error}"
) from error