client/commands/analyze.py (235 lines of code) (raw):
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import contextlib
import dataclasses
import json
import logging
import subprocess
from pathlib import Path
from typing import Optional, Sequence, Dict, Any, Iterator, List
from .. import (
command_arguments,
configuration as configuration_module,
error as error_module,
log,
)
from . import backend_arguments, commands, remote_logging, start, validate_models
LOG: logging.Logger = logging.getLogger(__name__)
@dataclasses.dataclass(frozen=True)
class Arguments:
"""
Data structure for configuration options the backend analyze command can recognize.
Need to keep in sync with `source/command/analyzeCommand.ml`
"""
base_arguments: backend_arguments.BaseArguments
dump_call_graph: Optional[str] = None
dump_model_query_results: Optional[str] = None
find_missing_flows: Optional[str] = None
inline_decorators: bool = False
maximum_tito_depth: Optional[int] = None
maximum_trace_length: Optional[int] = None
no_verify: bool = False
repository_root: Optional[str] = None
rule_filter: Optional[Sequence[int]] = None
save_results_to: Optional[str] = None
strict: bool = False
taint_model_paths: Sequence[str] = dataclasses.field(default_factory=list)
use_cache: bool = False
def serialize(self) -> Dict[str, Any]:
dump_call_graph = self.dump_call_graph
dump_model_query_results = self.dump_model_query_results
find_missing_flows = self.find_missing_flows
maximum_tito_depth = self.maximum_tito_depth
maximum_trace_length = self.maximum_trace_length
repository_root = self.repository_root
rule_filter = self.rule_filter
save_results_to = self.save_results_to
return {
**self.base_arguments.serialize(),
**({} if dump_call_graph is None else {"dump_call_graph": dump_call_graph}),
**(
{}
if dump_model_query_results is None
else {"dump_model_query_results": dump_model_query_results}
),
**(
{}
if find_missing_flows is None
else {"find_missing_flows": find_missing_flows}
),
"inline_decorators": self.inline_decorators,
**(
{}
if maximum_tito_depth is None
else {"maximum_tito_depth": maximum_tito_depth}
),
**(
{}
if maximum_trace_length is None
else {"maximum_trace_length": maximum_trace_length}
),
"no_verify": self.no_verify,
**({} if repository_root is None else {"repository_root": repository_root}),
**({} if rule_filter is None else {"rule_filter": rule_filter}),
**({} if save_results_to is None else {"save_results_to": save_results_to}),
"strict": self.strict,
"taint_model_paths": self.taint_model_paths,
"use_cache": self.use_cache,
}
def create_analyze_arguments(
configuration: configuration_module.Configuration,
analyze_arguments: command_arguments.AnalyzeArguments,
) -> Arguments:
"""
Translate client configurations to backend analyze configurations.
This API is not pure since it needs to access filesystem to filter out
nonexistent directories. It is idempotent though, since it does not alter
any filesystem state.
"""
source_paths = backend_arguments.get_source_path_for_check(configuration)
profiling_output = (
backend_arguments.get_profiling_log_path(Path(configuration.log_directory))
if analyze_arguments.enable_profiling
else None
)
memory_profiling_output = (
backend_arguments.get_profiling_log_path(Path(configuration.log_directory))
if analyze_arguments.enable_memory_profiling
else None
)
logger = configuration.logger
remote_logging = (
backend_arguments.RemoteLogging(
logger=logger, identifier=analyze_arguments.log_identifier or ""
)
if logger is not None
else None
)
find_missing_flows = analyze_arguments.find_missing_flows
rule = analyze_arguments.rule
taint_models_path = analyze_arguments.taint_models_path
if len(taint_models_path) == 0:
taint_models_path = configuration.taint_models_path
repository_root = analyze_arguments.repository_root
if repository_root is not None:
repository_root = str(Path(repository_root).resolve(strict=False))
return Arguments(
base_arguments=backend_arguments.BaseArguments(
log_path=configuration.log_directory,
global_root=configuration.project_root,
checked_directory_allowlist=backend_arguments.get_checked_directory_allowlist(
configuration, source_paths
),
checked_directory_blocklist=(
configuration.get_existent_ignore_all_errors_paths()
),
debug=analyze_arguments.debug,
excludes=configuration.excludes,
extensions=configuration.get_valid_extension_suffixes(),
relative_local_root=configuration.relative_local_root,
memory_profiling_output=memory_profiling_output,
number_of_workers=configuration.get_number_of_workers(),
parallel=not analyze_arguments.sequential,
profiling_output=profiling_output,
python_version=configuration.get_python_version(),
shared_memory=configuration.shared_memory,
remote_logging=remote_logging,
search_paths=configuration.expand_and_get_existent_search_paths(),
source_paths=source_paths,
),
dump_call_graph=analyze_arguments.dump_call_graph,
dump_model_query_results=analyze_arguments.dump_model_query_results,
find_missing_flows=str(find_missing_flows.value)
if find_missing_flows is not None
else None,
inline_decorators=analyze_arguments.inline_decorators,
maximum_tito_depth=analyze_arguments.maximum_tito_depth,
maximum_trace_length=analyze_arguments.maximum_trace_length,
no_verify=analyze_arguments.no_verify,
repository_root=repository_root,
rule_filter=None if len(rule) == 0 else rule,
save_results_to=analyze_arguments.save_results_to,
strict=configuration.strict,
taint_model_paths=taint_models_path,
use_cache=analyze_arguments.use_cache,
)
@contextlib.contextmanager
def create_analyze_arguments_and_cleanup(
configuration: configuration_module.Configuration,
analyze_arguments: command_arguments.AnalyzeArguments,
) -> Iterator[Arguments]:
arguments = create_analyze_arguments(configuration, analyze_arguments)
try:
yield arguments
finally:
# It is safe to clean up source paths after analyze command since
# any created artifact directory won't be reused by other commands.
arguments.base_arguments.source_paths.cleanup()
def parse_taint_configuration_errors(
response: str,
) -> List[error_module.TaintConfigurationError]:
response_json = json.loads(response)
errors = response_json.get("errors", [])
return [error_module.TaintConfigurationError.from_json(error) for error in errors]
def parse_model_validation_errors(
response: str,
) -> List[error_module.ModelVerificationError]:
response_json = json.loads(response)
return validate_models.parse_validation_errors(response_json)
def _run_analyze_command(
command: Sequence[str], output: str, forward_stdout: bool
) -> commands.ExitCode:
with backend_arguments.backend_log_file(prefix="pyre_analyze") as log_file:
with start.background_logging(Path(log_file.name)):
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=log_file.file,
universal_newlines=True,
)
return_code = result.returncode
# Interpretation of the return code needs to be kept in sync with
# `command/newAnalyzeCommand.ml`.
if return_code == 0:
if forward_stdout:
log.stdout.write(result.stdout)
return commands.ExitCode.SUCCESS
elif return_code == 2:
LOG.error("Pyre encountered a failure within buck.")
return commands.ExitCode.BUCK_INTERNAL_ERROR
elif return_code == 3:
LOG.error("Pyre encountered an error when building the buck targets.")
return commands.ExitCode.BUCK_USER_ERROR
elif return_code == 10:
error_module.print_errors(
parse_taint_configuration_errors(result.stdout),
output=output,
error_kind="taint configuration",
)
return commands.ExitCode.TAINT_CONFIGURATION_ERROR
elif return_code == 11:
error_module.print_errors(
parse_model_validation_errors(result.stdout),
output=output,
error_kind="model verification",
)
return commands.ExitCode.MODEL_VERIFICATION_ERROR
else:
LOG.error(
f"Check command exited with non-zero return code: {return_code}."
)
return commands.ExitCode.FAILURE
def run_analyze(
configuration: configuration_module.Configuration,
analyze_arguments: command_arguments.AnalyzeArguments,
) -> commands.ExitCode:
binary_location = configuration.get_binary_respecting_override()
if binary_location is None:
raise configuration_module.InvalidConfiguration(
"Cannot locate a Pyre binary to run."
)
with create_analyze_arguments_and_cleanup(
configuration, analyze_arguments
) as arguments:
with backend_arguments.temporary_argument_file(arguments) as argument_file_path:
analyze_command = [binary_location, "newanalyze", str(argument_file_path)]
return _run_analyze_command(
command=analyze_command,
output=analyze_arguments.output,
forward_stdout=(analyze_arguments.save_results_to is None),
)
@remote_logging.log_usage(command_name="analyze")
def run(
configuration: configuration_module.Configuration,
analyze_arguments: command_arguments.AnalyzeArguments,
) -> commands.ExitCode:
try:
return run_analyze(configuration, analyze_arguments)
except Exception as error:
raise commands.ClientException(
f"Exception occurred during pyre analyze: {error}"
) from error