client/commands/backend_arguments.py (296 lines of code) (raw):

# Copyright (c) Meta Platforms, Inc. and affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import contextlib import dataclasses import json import logging import os import shutil import tempfile from pathlib import Path from typing import Optional, Dict, List, Union, Sequence, Set, IO, Iterator, Any from typing_extensions import Protocol from .. import configuration as configuration_module, find_directories LOG: logging.Logger = logging.getLogger(__name__) SERVER_ARTIFACT_ROOT_NAME: str = "link_trees" @dataclasses.dataclass(frozen=True) class RemoteLogging: logger: str identifier: str = "" @staticmethod def create( logger: Optional[str] = None, identifier: Optional[str] = None ) -> "Optional[RemoteLogging]": return ( RemoteLogging(logger=logger, identifier=identifier or "") if logger is not None else None ) def serialize(self) -> Dict[str, str]: return {"logger": self.logger, "identifier": self.identifier} @dataclasses.dataclass(frozen=True) class SimpleSourcePath: elements: Sequence[configuration_module.SearchPathElement] = dataclasses.field( default_factory=list ) def serialize(self) -> Dict[str, object]: return { "kind": "simple", "paths": [element.command_line_argument() for element in self.elements], } def get_checked_directory_allowlist(self) -> Set[str]: return {element.path() for element in self.elements} def cleanup(self) -> None: pass @dataclasses.dataclass(frozen=True) class WithUnwatchedDependencySourcePath: change_indicator_root: Path unwatched_dependency: configuration_module.UnwatchedDependency elements: Sequence[configuration_module.SearchPathElement] = dataclasses.field( default_factory=list ) def serialize(self) -> Dict[str, object]: return { "kind": "with_unwatched_dependency", "paths": [element.command_line_argument() for element in self.elements], "unwatched_dependency": { "change_indicator": { "root": str(self.change_indicator_root), "relative": self.unwatched_dependency.change_indicator, }, "files": { "root": self.unwatched_dependency.files.root, "checksum_path": self.unwatched_dependency.files.checksum_path, }, }, } def get_checked_directory_allowlist(self) -> Set[str]: return {element.path() for element in self.elements} def cleanup(self) -> None: pass @dataclasses.dataclass(frozen=True) class BuckSourcePath: source_root: Path artifact_root: Path checked_directory: Path targets: Sequence[str] = dataclasses.field(default_factory=list) mode: Optional[str] = None isolation_prefix: Optional[str] = None use_buck2: bool = False def serialize(self) -> Dict[str, object]: mode = self.mode isolation_prefix = self.isolation_prefix return { "kind": "buck", "targets": self.targets, **({} if mode is None else {"mode": mode}), **( {} if isolation_prefix is None else {"isolation_prefix": isolation_prefix} ), "use_buck2": self.use_buck2, "source_root": str(self.source_root), "artifact_root": str(self.artifact_root), } def get_checked_directory_allowlist(self) -> Set[str]: return {str(self.checked_directory)} def cleanup(self) -> None: shutil.rmtree(str(self.artifact_root), ignore_errors=True) SourcePath = Union[SimpleSourcePath, WithUnwatchedDependencySourcePath, BuckSourcePath] @dataclasses.dataclass(frozen=True) class BaseArguments: """ Data structure for configuration options common to many backend commands. Need to keep in sync with `pyre/command/newCommandStartup.ml` """ log_path: str global_root: str source_paths: SourcePath checked_directory_allowlist: Sequence[str] = dataclasses.field(default_factory=list) checked_directory_blocklist: Sequence[str] = dataclasses.field(default_factory=list) debug: bool = False excludes: Sequence[str] = dataclasses.field(default_factory=list) extensions: Sequence[str] = dataclasses.field(default_factory=list) relative_local_root: Optional[str] = None memory_profiling_output: Optional[Path] = None number_of_workers: int = 1 parallel: bool = True profiling_output: Optional[Path] = None python_version: configuration_module.PythonVersion = ( configuration_module.PythonVersion(major=3) ) shared_memory: configuration_module.SharedMemory = ( configuration_module.SharedMemory() ) remote_logging: Optional[RemoteLogging] = None search_paths: Sequence[configuration_module.SearchPathElement] = dataclasses.field( default_factory=list ) def get_local_root(self) -> Optional[str]: if self.relative_local_root is None: return None return os.path.join(self.global_root, self.relative_local_root) def serialize(self) -> Dict[str, Any]: local_root = self.get_local_root() return { "source_paths": self.source_paths.serialize(), "search_paths": [ element.command_line_argument() for element in self.search_paths ], "excludes": self.excludes, "checked_directory_allowlist": self.checked_directory_allowlist, "checked_directory_blocklist": self.checked_directory_blocklist, "extensions": self.extensions, "log_path": self.log_path, "global_root": self.global_root, **({} if local_root is None else {"local_root": local_root}), "debug": self.debug, "python_version": { "major": self.python_version.major, "minor": self.python_version.minor, "micro": self.python_version.micro, }, "shared_memory": self.shared_memory.to_json(), "parallel": self.parallel, "number_of_workers": self.number_of_workers, **( {} if self.remote_logging is None else {"remote_logging": self.remote_logging.serialize()} ), **( {} if self.profiling_output is None else {"profiling_output": str(self.profiling_output)} ), **( {} if self.memory_profiling_output is None else {"memory_profiling_output": str(self.memory_profiling_output)} ), } def find_watchman_root( base: Path, stop_search_after: Optional[int] = None, ) -> Optional[Path]: return find_directories.find_parent_directory_containing_file( base, ".watchmanconfig", stop_search_after ) def find_buck_root( base: Path, stop_search_after: Optional[int] = None, ) -> Optional[Path]: return find_directories.find_parent_directory_containing_file( base, ".buckconfig", stop_search_after ) def find_buck2_root( base: Path, stop_search_after: Optional[int] = None, ) -> Optional[Path]: # Buck2 uses project root instead of cell root as its base directory. # This is essentially what `buck2 root --kind project` does. return find_directories.find_outermost_directory_containing_file( base, ".buckconfig", stop_search_after ) def _get_global_or_local_root( configuration: configuration_module.Configuration, ) -> Path: global_root = Path(configuration.project_root) relative_local_root = configuration.relative_local_root return ( (global_root / relative_local_root) if relative_local_root is not None else global_root ) def get_source_path( configuration: configuration_module.Configuration, artifact_root_name: str ) -> SourcePath: source_directories = configuration.source_directories targets = configuration.targets buck_mode = configuration.buck_mode.get() if configuration.buck_mode else None if source_directories is not None and targets is None: elements: Sequence[ configuration_module.SearchPathElement ] = configuration.get_source_directories() if len(elements) == 0: LOG.warning("Pyre did not find an existent source directory.") unwatched_dependency = configuration.get_existent_unwatched_dependency() if unwatched_dependency is not None: return WithUnwatchedDependencySourcePath( change_indicator_root=_get_global_or_local_root(configuration), unwatched_dependency=unwatched_dependency, elements=elements, ) else: return SimpleSourcePath(elements) if targets is not None and source_directories is None: if len(targets) == 0: LOG.warning("Pyre did not find any targets to check.") use_buck2 = configuration.use_buck2 search_base = _get_global_or_local_root(configuration) source_root = ( find_buck2_root(search_base) if use_buck2 else find_buck_root(search_base) ) if source_root is None: raise configuration_module.InvalidConfiguration( "Cannot find a buck root for the specified targets. " + "Make sure the project is covered by a `.buckconfig` file." ) return BuckSourcePath( source_root=source_root, artifact_root=configuration.dot_pyre_directory / artifact_root_name, checked_directory=search_base, targets=targets, mode=buck_mode, isolation_prefix=configuration.isolation_prefix, use_buck2=use_buck2, ) if source_directories is not None and targets is not None: raise configuration_module.InvalidConfiguration( "`source_directories` and `targets` are mutually exclusive" ) raise configuration_module.InvalidConfiguration( "Cannot find any source files to analyze. " + "Either `source_directories` or `targets` must be specified." ) def get_source_path_for_server( configuration: configuration_module.Configuration, ) -> SourcePath: # We know that for each source root there could be at most one server alive. # Therefore artifact root name can be a fixed constant. artifact_root_name = SERVER_ARTIFACT_ROOT_NAME relative_local_root = configuration.relative_local_root if relative_local_root is not None: # Prevent artifact roots of different local projects from clashing with # each other. artifact_root_name = str(Path(artifact_root_name) / relative_local_root) return get_source_path(configuration, artifact_root_name) def get_source_path_for_check( configuration: configuration_module.Configuration, ) -> SourcePath: # Artifact for one-off check command should not be a fixed constant, to prevent # concurrent check commands overwriting each other's artifacts. Here we use process # ID to isolate the artifact root of each individual check command. return get_source_path(configuration, str(os.getpid())) def get_checked_directory_allowlist( configuration: configuration_module.Configuration, source_path: SourcePath ) -> List[str]: source_path_allowlist = list(source_path.get_checked_directory_allowlist()) explicit_allowlist = configuration.get_existent_do_not_ignore_errors_in_paths() # If allowlist paths were specifically provided, do not include inferred paths. return explicit_allowlist or source_path_allowlist def get_profiling_log_path(log_directory: Path) -> Path: return log_directory / "profiling.log" class SerializableArguments(Protocol): def serialize(self) -> Dict[str, Any]: ... def _write_argument_file( output_file: IO[str], arguments: SerializableArguments ) -> None: LOG.info(f"Writing arguments into {output_file.name}...") serialized_arguments = arguments.serialize() LOG.debug(f"Arguments:\n{json.dumps(serialized_arguments, indent=2)}") output_file.write(json.dumps(serialized_arguments)) output_file.flush() @contextlib.contextmanager def temporary_argument_file(arguments: SerializableArguments) -> Iterator[Path]: with tempfile.NamedTemporaryFile( mode="w", prefix="pyre_arguments_", suffix=".json" ) as argument_file: _write_argument_file(argument_file, arguments) yield Path(argument_file.name) @dataclasses.dataclass class LogFile: name: str file: IO[str] @contextlib.contextmanager def backend_log_file(prefix: str) -> Iterator[LogFile]: with tempfile.NamedTemporaryFile( mode="w", prefix=prefix, suffix=".log", delete=True ) as argument_file: yield LogFile(name=argument_file.name, file=argument_file.file)