client/commands/check.py (172 lines of code) (raw):
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import contextlib
import dataclasses
import json
import logging
import subprocess
from pathlib import Path
from typing import Sequence, Dict, Any, Iterator, List
from .. import (
command_arguments,
configuration as configuration_module,
error,
)
from . import commands, remote_logging, backend_arguments, start, incremental
LOG: logging.Logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class Arguments:
"""
Data structure for configuration options the backend check command can recognize.
Need to keep in sync with `source/command/checkCommand.ml`
"""
base_arguments: backend_arguments.BaseArguments
additional_logging_sections: Sequence[str] = dataclasses.field(default_factory=list)
show_error_traces: bool = False
strict: bool = False
def serialize(self) -> Dict[str, Any]:
return {
**self.base_arguments.serialize(),
"additional_logging_sections": self.additional_logging_sections,
"show_error_traces": self.show_error_traces,
"strict": self.strict,
}
def create_check_arguments(
configuration: configuration_module.Configuration,
check_arguments: command_arguments.CheckArguments,
) -> Arguments:
"""
Translate client configurations to backend check configurations.
This API is not pure since it needs to access filesystem to filter out
nonexistent directories. It is idempotent though, since it does not alter
any filesystem state.
"""
source_paths = backend_arguments.get_source_path_for_check(configuration)
logging_sections = check_arguments.logging_sections
additional_logging_sections = (
[] if logging_sections is None else logging_sections.split(",")
)
if check_arguments.noninteractive:
additional_logging_sections.append("-progress")
profiling_output = (
backend_arguments.get_profiling_log_path(Path(configuration.log_directory))
if check_arguments.enable_profiling
else None
)
memory_profiling_output = (
backend_arguments.get_profiling_log_path(Path(configuration.log_directory))
if check_arguments.enable_memory_profiling
else None
)
logger = configuration.logger
remote_logging = (
backend_arguments.RemoteLogging(
logger=logger, identifier=check_arguments.log_identifier or ""
)
if logger is not None
else None
)
return Arguments(
base_arguments=backend_arguments.BaseArguments(
log_path=configuration.log_directory,
global_root=configuration.project_root,
checked_directory_allowlist=backend_arguments.get_checked_directory_allowlist(
configuration, source_paths
),
checked_directory_blocklist=(
configuration.get_existent_ignore_all_errors_paths()
),
debug=check_arguments.debug,
excludes=configuration.excludes,
extensions=configuration.get_valid_extension_suffixes(),
relative_local_root=configuration.relative_local_root,
memory_profiling_output=memory_profiling_output,
number_of_workers=configuration.get_number_of_workers(),
parallel=not check_arguments.sequential,
profiling_output=profiling_output,
python_version=configuration.get_python_version(),
shared_memory=configuration.shared_memory,
remote_logging=remote_logging,
search_paths=configuration.expand_and_get_existent_search_paths(),
source_paths=source_paths,
),
additional_logging_sections=additional_logging_sections,
show_error_traces=check_arguments.show_error_traces,
strict=configuration.strict,
)
@contextlib.contextmanager
def create_check_arguments_and_cleanup(
configuration: configuration_module.Configuration,
check_arguments: command_arguments.CheckArguments,
) -> Iterator[Arguments]:
arguments = create_check_arguments(configuration, check_arguments)
try:
yield arguments
finally:
# It is safe to clean up source paths after check command since
# any created artifact directory won't be reused by other commands.
arguments.base_arguments.source_paths.cleanup()
class InvalidCheckResponse(Exception):
pass
def parse_type_error_response_json(response_json: object) -> List[error.Error]:
try:
# The response JSON is expected to have the following form:
# `{"errors": [error_json0, error_json1, ...]}`
if isinstance(response_json, dict):
errors_json = response_json.get("errors", [])
if isinstance(errors_json, list):
return [error.Error.from_json(error_json) for error_json in errors_json]
raise InvalidCheckResponse(
f"Unexpected JSON response from check command: {response_json}"
)
except error.ErrorParsingFailure as parsing_error:
message = f"Unexpected error JSON from check command: {parsing_error}"
raise InvalidCheckResponse(message) from parsing_error
def parse_type_error_response(response: str) -> List[error.Error]:
try:
response_json = json.loads(response)
return parse_type_error_response_json(response_json)
except json.JSONDecodeError as decode_error:
message = f"Cannot parse response as JSON: {decode_error}"
raise InvalidCheckResponse(message) from decode_error
def _run_check_command(command: Sequence[str], output: str) -> commands.ExitCode:
with backend_arguments.backend_log_file(prefix="pyre_check") as log_file:
with start.background_logging(Path(log_file.name)):
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=log_file.file,
universal_newlines=True,
)
return_code = result.returncode
# Interpretation of the return code needs to be kept in sync with
# `source/command/checkCommand.ml`.
if return_code == 0:
type_errors = parse_type_error_response(result.stdout)
incremental.display_type_errors(type_errors, output=output)
return (
commands.ExitCode.SUCCESS
if len(type_errors) == 0
else commands.ExitCode.FOUND_ERRORS
)
elif return_code == 2:
LOG.error("Pyre encountered a failure within buck.")
return commands.ExitCode.BUCK_INTERNAL_ERROR
elif return_code == 3:
LOG.error("Pyre encountered an error when building the buck targets.")
return commands.ExitCode.BUCK_USER_ERROR
else:
LOG.error(
f"Check command exited with non-zero return code: {return_code}."
)
return commands.ExitCode.FAILURE
def run_check(
configuration: configuration_module.Configuration,
check_arguments: command_arguments.CheckArguments,
) -> commands.ExitCode:
binary_location = configuration.get_binary_respecting_override()
if binary_location is None:
raise configuration_module.InvalidConfiguration(
"Cannot locate a Pyre binary to run."
)
with create_check_arguments_and_cleanup(
configuration, check_arguments
) as arguments:
with backend_arguments.temporary_argument_file(arguments) as argument_file_path:
check_command = [binary_location, "newcheck", str(argument_file_path)]
return _run_check_command(
command=check_command, output=check_arguments.output
)
@remote_logging.log_usage(command_name="check")
def run(
configuration: configuration_module.Configuration,
check_arguments: command_arguments.CheckArguments,
) -> commands.ExitCode:
try:
return run_check(configuration, check_arguments)
except Exception as error:
raise commands.ClientException(
f"Exception occurred during Pyre check: {error}"
) from error