client/configuration.py (1,425 lines of code) (raw):

# Copyright (c) Meta Platforms, Inc. and affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. # pyre-unsafe import abc import dataclasses import glob import json import logging import multiprocessing import os import platform import re import shutil import site import subprocess import sys import textwrap from dataclasses import dataclass, field from logging import Logger from pathlib import Path from typing import ( Any, Callable, ClassVar, Dict, Generic, Iterable, List, Optional, Sequence, Set, Type, TypeVar, Union, ) from . import command_arguments, find_directories from .exceptions import EnvironmentException from .filesystem import assert_readable_directory, expand_relative_path from .find_directories import ( BINARY_NAME, CONFIGURATION_FILE, LOCAL_CONFIGURATION_FILE, LOG_DIRECTORY, get_relative_local_root, ) LOG: Logger = logging.getLogger(__name__) T = TypeVar("T") def _expand_global_root(path: str, global_root: str) -> str: if path.startswith("//"): return expand_relative_path(global_root, path[2:]) return path def _expand_relative_root(path: str, relative_root: str) -> str: if not path.startswith("//"): return expand_relative_path(relative_root, path) return path def _get_optional_value(source: Optional[T], default: T) -> T: return source if source is not None else default def _expand_and_get_existent_ignore_all_errors_path( ignore_all_errors: Iterable[str], project_root: str ) -> List[str]: expanded_ignore_paths = [] for path in ignore_all_errors: expanded = glob.glob(_expand_global_root(path, global_root=project_root)) if not expanded: expanded_ignore_paths.append(path) else: expanded_ignore_paths.extend(expanded) paths = [] for path in expanded_ignore_paths: if os.path.exists(path): paths.append(path) else: LOG.warning(f"Nonexistent paths passed in to `ignore_all_errors`: `{path}`") return paths class InvalidConfiguration(Exception): def __init__(self, message: str) -> None: self.message = f"Invalid configuration: {message}" super().__init__(self.message) class InvalidPythonVersion(InvalidConfiguration): def __init__(self, message: str) -> None: super().__init__(message) class SearchPathElement(abc.ABC): @abc.abstractmethod def path(self) -> str: raise NotImplementedError @abc.abstractmethod def get_root(self) -> str: raise NotImplementedError @abc.abstractmethod def command_line_argument(self) -> str: raise NotImplementedError @abc.abstractmethod def expand_global_root(self, global_root: str) -> "SearchPathElement": raise NotImplementedError @abc.abstractmethod def expand_relative_root(self, relative_root: str) -> "SearchPathElement": raise NotImplementedError @abc.abstractmethod def expand_glob(self) -> List["SearchPathElement"]: raise NotImplementedError @dataclasses.dataclass(frozen=True) class SimpleSearchPathElement(SearchPathElement): root: str def path(self) -> str: return self.root def get_root(self) -> str: return self.root def command_line_argument(self) -> str: return self.root def expand_global_root(self, global_root: str) -> SearchPathElement: return SimpleSearchPathElement( _expand_global_root(self.root, global_root=global_root) ) def expand_relative_root(self, relative_root: str) -> SearchPathElement: return SimpleSearchPathElement( _expand_relative_root(self.root, relative_root=relative_root) ) def expand_glob(self) -> List[SearchPathElement]: expanded = sorted(glob.glob(self.get_root())) if expanded: return [SimpleSearchPathElement(path) for path in expanded] else: LOG.warning(f"'{self.path()}' does not match any paths.") return [] @dataclasses.dataclass(frozen=True) class SubdirectorySearchPathElement(SearchPathElement): root: str subdirectory: str def path(self) -> str: return os.path.join(self.root, self.subdirectory) def get_root(self) -> str: return self.root def command_line_argument(self) -> str: return self.root + "$" + self.subdirectory def expand_global_root(self, global_root: str) -> SearchPathElement: return SubdirectorySearchPathElement( root=_expand_global_root(self.root, global_root=global_root), subdirectory=self.subdirectory, ) def expand_relative_root(self, relative_root: str) -> SearchPathElement: return SubdirectorySearchPathElement( root=_expand_relative_root(self.root, relative_root=relative_root), subdirectory=self.subdirectory, ) def expand_glob(self) -> List["SearchPathElement"]: return [self] @dataclasses.dataclass(frozen=True) class SitePackageSearchPathElement(SearchPathElement): site_root: str package_name: str is_toplevel_module: bool = False def package_path(self) -> str: module_suffix = ".py" if self.is_toplevel_module else "" return self.package_name + module_suffix def path(self) -> str: return os.path.join(self.site_root, self.package_path()) def get_root(self) -> str: return self.site_root def command_line_argument(self) -> str: return self.site_root + "$" + self.package_path() def expand_global_root(self, global_root: str) -> SearchPathElement: # Site package does not participate in root expansion. return self def expand_relative_root(self, relative_root: str) -> SearchPathElement: # Site package does not participate in root expansion. return self def expand_glob(self) -> List["SearchPathElement"]: return [self] @dataclasses.dataclass class ExtensionElement: suffix: str include_suffix_in_module_qualifier: bool def command_line_argument(self) -> str: options = "" if self.include_suffix_in_module_qualifier: options = "$" + "include_suffix_in_module_qualifier" return self.suffix + options @staticmethod def from_json(json: Union[str, Dict[str, Union[str, bool]]]) -> "ExtensionElement": if isinstance(json, str): return ExtensionElement( suffix=json, include_suffix_in_module_qualifier=False ) elif isinstance(json, dict): include_suffix_in_module_qualifier = False if "include_suffix_in_module_qualifier" in json: value = json["include_suffix_in_module_qualifier"] if isinstance(value, bool): include_suffix_in_module_qualifier = value if "suffix" in json: suffix = json["suffix"] if isinstance(suffix, str): return ExtensionElement( suffix=suffix, include_suffix_in_module_qualifier=include_suffix_in_module_qualifier, ) raise InvalidConfiguration(f"Invalid extension element: {json}") def get_site_roots() -> List[str]: try: return site.getsitepackages() + [site.getusersitepackages()] except AttributeError: # There are a few Python versions that ship with a broken venv, # where `getsitepackages` is not available. LOG.warning( "Either `site.getusersitepackages()` or `site.getsitepackages()` " + "is not available in your virtualenv. This is a known virtualenv " + 'bug and as a workaround please avoid using `"site-package"` in ' + "your search path configuration." ) return [] def create_search_paths( json: Union[str, Dict[str, Union[str, bool]]], site_roots: Iterable[str] ) -> List[SearchPathElement]: if isinstance(json, str): return [SimpleSearchPathElement(json)] elif isinstance(json, dict): if "root" in json and "subdirectory" in json: return [ SubdirectorySearchPathElement( root=str(json["root"]), subdirectory=str(json["subdirectory"]) ) ] if "import_root" in json and "source" in json: return [ SubdirectorySearchPathElement( root=str(json["import_root"]), subdirectory=str(json["source"]) ) ] elif "site-package" in json: is_toplevel_module = ( "is_toplevel_module" in json and json["is_toplevel_module"] ) return [ SitePackageSearchPathElement( site_root=root, package_name=str(json["site-package"]), is_toplevel_module=bool(is_toplevel_module), ) for root in site_roots ] raise InvalidConfiguration(f"Invalid search path element: {json}") def assert_readable_directory_in_configuration( directory: str, field_name: str = "" ) -> None: try: assert_readable_directory(directory, error_message_prefix=f"{field_name} ") except EnvironmentException as error: raise InvalidConfiguration(str(error)) def _in_virtual_environment(override: Optional[bool] = None) -> bool: if override is not None: return override return sys.prefix != sys.base_prefix def _expand_and_get_existent_paths( paths: Sequence[SearchPathElement], ) -> List[SearchPathElement]: expanded_search_paths = [ expanded_path for search_path_element in paths for expanded_path in search_path_element.expand_glob() ] existent_paths = [] for search_path_element in expanded_search_paths: search_path = search_path_element.path() if os.path.exists(search_path): existent_paths.append(search_path_element) else: LOG.warning(f"Path does not exist: {search_path}") return existent_paths PLATFORM_MAPPING = { "Darwin": "macos", "Windows": "windows", "Linux": "linux", "default": "default", } @dataclass(frozen=True) class PlatformAware(Generic[T]): default: T windows: Optional[T] = None macos: Optional[T] = None linux: Optional[T] = None @staticmethod def from_json( value: Union[T, Dict[str, T]], field_name: str ) -> Optional["PlatformAware"]: if value is None: return None elif isinstance(value, Dict): if len(value) == 0: return None invalid_keys = value.keys() - PLATFORM_MAPPING.values() if not len(invalid_keys) == 0: raise InvalidConfiguration( f"Configuration `{field_name}` only supports platforms: " f"{PLATFORM_MAPPING.values()} but got: `{invalid_keys}`." ) default = ( value["default"] if "default" in value else value[sorted(value.keys())[0]] ) return PlatformAware( default=default, windows=value["windows"] if "windows" in value else None, macos=value["macos"] if "macos" in value else None, linux=value["linux"] if "linux" in value else None, ) else: return PlatformAware(default=value) @staticmethod def merge( base: Optional["PlatformAware"], override: Optional["PlatformAware"] ) -> Optional["PlatformAware"]: if base is None: return override elif override is None: return base else: return PlatformAware( default=override.default, windows=override.windows if override.windows is not None else base.windows, macos=override.macos if override.macos is not None else base.macos, linux=override.linux if override.linux is not None else base.linux, ) def get(self, key: Optional[str] = None) -> T: if key is None: key = PLATFORM_MAPPING[platform.system()] value: T = self.__getattribute__(key) return value if value is not None else self.default def validate_values(self, check: Callable[[T], bool]): return ( check(self.default) and (check(self.windows) if self.windows is not None else True) and (check(self.macos) if self.macos is not None else True) and (check(self.linux) if self.linux is not None else True) ) def to_json(self): result = {"default": self.default} if self.windows is not None: result["windows"] = self.windows if self.linux is not None: result["linux"] = self.linux if self.macos is not None: result["macos"] = self.macos return result @dataclass(frozen=True) class PythonVersion: major: int minor: int = 0 micro: int = 0 @staticmethod def from_string(input: str) -> "PythonVersion": try: splits = input.split(".") if len(splits) == 1: return PythonVersion(major=int(splits[0])) elif len(splits) == 2: return PythonVersion(major=int(splits[0]), minor=int(splits[1])) elif len(splits) == 3: return PythonVersion( major=int(splits[0]), minor=int(splits[1]), micro=int(splits[2]) ) raise InvalidPythonVersion( "Version string is expected to have the form of 'X.Y.Z' but got " + f"'{input}'" ) except ValueError as error: raise InvalidPythonVersion(str(error)) def to_string(self) -> str: return f"{self.major}.{self.minor}.{self.micro}" @dataclass(frozen=True) class SharedMemory: heap_size: Optional[int] = None dependency_table_power: Optional[int] = None hash_table_power: Optional[int] = None def to_json(self) -> Dict[str, int]: heap_size = self.heap_size dependency_table_power = self.dependency_table_power hash_table_power = self.hash_table_power return { **({"heap_size": heap_size} if heap_size is not None else {}), **( {"dependency_table_power": dependency_table_power} if dependency_table_power is not None else {} ), **( {"hash_table_power": hash_table_power} if hash_table_power is not None else {} ), } @dataclass(frozen=True) class IdeFeatures: hover_enabled: Optional[bool] = None DEFAULT_HOVER_ENABLED: ClassVar[bool] = False go_to_definition_enabled: Optional[bool] = None DEFAULT_GO_TO_DEFINITION_ENABLED: ClassVar[bool] = False def to_json(self) -> Dict[str, int]: return { **( {"hover_enabled": self.hover_enabled} if self.hover_enabled is not None else {} ), **( {"go_to_definition_enabled": self.go_to_definition_enabled} if self.go_to_definition_enabled is not None else {} ), } def is_hover_enabled(self) -> bool: return ( self.hover_enabled if self.hover_enabled is not None else self.DEFAULT_HOVER_ENABLED ) def is_go_to_definition_enabled(self) -> bool: return ( self.go_to_definition_enabled if self.go_to_definition_enabled is not None else self.DEFAULT_GO_TO_DEFINITION_ENABLED ) @dataclass(frozen=True) class UnwatchedFiles: root: str checksum_path: str @staticmethod def from_json(json: Dict[str, object]) -> "UnwatchedFiles": root = json.get("root", None) if root is None: raise InvalidConfiguration("Missing `root` field in UnwatchedFiles") if not isinstance(root, str): raise InvalidConfiguration( "`root` field in UnwatchedFiles must be a string" ) checksum_path = json.get("checksum_path", None) if checksum_path is None: raise InvalidConfiguration( "Missing `checksum_path` field in UnwatchedFiles" ) if not isinstance(checksum_path, str): raise InvalidConfiguration( "`checksum_path` field in UnwatchedFiles must be a string" ) return UnwatchedFiles(root=root, checksum_path=checksum_path) def to_json(self) -> Dict[str, str]: return { "root": self.root, "checksum_path": self.checksum_path, } @dataclass(frozen=True) class UnwatchedDependency: change_indicator: str files: UnwatchedFiles @staticmethod def from_json(json: Dict[str, object]) -> "UnwatchedDependency": change_indicator = json.get("change_indicator", None) if change_indicator is None: raise InvalidConfiguration( "Missing `change_indicator` field in UnwatchedDependency" ) if not isinstance(change_indicator, str): raise InvalidConfiguration( "`change_indicator` field in UnwatchedDependency must be a string" ) files_json = json.get("files", None) if files_json is None: raise InvalidConfiguration("Missing `files` field in UnwatchedDependency") if not isinstance(files_json, dict): raise InvalidConfiguration( "`files` field in UnwatchedDependency must be a dict" ) return UnwatchedDependency( change_indicator=change_indicator, files=UnwatchedFiles.from_json(files_json), ) def to_json(self) -> Dict[str, object]: return { "change_indicator": str(self.change_indicator), "files": self.files.to_json(), } @dataclass(frozen=True) class PartialConfiguration: binary: Optional[str] = None buck_mode: Optional[PlatformAware[str]] = None disabled: Optional[bool] = None do_not_ignore_errors_in: Sequence[str] = field(default_factory=list) dot_pyre_directory: Optional[Path] = None excludes: Sequence[str] = field(default_factory=list) extensions: Sequence[ExtensionElement] = field(default_factory=list) ide_features: Optional[IdeFeatures] = None ignore_all_errors: Sequence[str] = field(default_factory=list) ignore_infer: Sequence[str] = field(default_factory=list) isolation_prefix: Optional[str] = None logger: Optional[str] = None number_of_workers: Optional[int] = None oncall: Optional[str] = None other_critical_files: Sequence[str] = field(default_factory=list) pysa_version_hash: Optional[str] = None python_version: Optional[PythonVersion] = None shared_memory: SharedMemory = SharedMemory() search_path: Sequence[SearchPathElement] = field(default_factory=list) source_directories: Optional[Sequence[SearchPathElement]] = None strict: Optional[bool] = None taint_models_path: Sequence[str] = field(default_factory=list) targets: Optional[Sequence[str]] = None typeshed: Optional[str] = None unwatched_dependency: Optional[UnwatchedDependency] = None use_buck2: Optional[bool] = None version_hash: Optional[str] = None @staticmethod def _get_depreacted_map() -> Dict[str, str]: return {"do_not_check": "ignore_all_errors"} @staticmethod def _get_extra_keys() -> Set[str]: return { "create_open_source_configuration", "saved_state", "stable_client", "taint_models_path", "unstable_client", } @staticmethod def from_command_arguments( arguments: command_arguments.CommandArguments, ) -> "PartialConfiguration": strict: Optional[bool] = True if arguments.strict else None source_directories = [ SimpleSearchPathElement(element) for element in arguments.source_directories ] or None targets: Optional[List[str]] = ( arguments.targets if len(arguments.targets) > 0 else None ) python_version_string = arguments.python_version ide_features = ( IdeFeatures( hover_enabled=arguments.enable_hover, go_to_definition_enabled=arguments.enable_go_to_definition, ) if arguments.enable_hover is not None or arguments.enable_go_to_definition is not None else None ) return PartialConfiguration( binary=arguments.binary, buck_mode=PlatformAware.from_json(arguments.buck_mode, "buck_mode"), disabled=None, do_not_ignore_errors_in=arguments.do_not_ignore_errors_in, dot_pyre_directory=arguments.dot_pyre_directory, excludes=arguments.exclude, extensions=[], ide_features=ide_features, ignore_all_errors=[], ignore_infer=[], isolation_prefix=arguments.isolation_prefix, logger=arguments.logger, number_of_workers=arguments.number_of_workers, oncall=None, other_critical_files=[], pysa_version_hash=None, python_version=( PythonVersion.from_string(python_version_string) if python_version_string is not None else None ), shared_memory=SharedMemory( heap_size=arguments.shared_memory_heap_size, dependency_table_power=arguments.shared_memory_dependency_table_power, hash_table_power=arguments.shared_memory_hash_table_power, ), search_path=[ SimpleSearchPathElement(element) for element in arguments.search_path ], source_directories=source_directories, strict=strict, taint_models_path=[], targets=targets, typeshed=arguments.typeshed, unwatched_dependency=None, use_buck2=arguments.use_buck2, version_hash=None, ) @staticmethod def from_string(contents: str) -> "PartialConfiguration": def is_list_of_string(elements: object) -> bool: return isinstance(elements, list) and all( isinstance(element, str) for element in elements ) def ensure_option_type( json: Dict[str, Any], name: str, expected_type: Type[T] ) -> Optional[T]: result = json.pop(name, None) if result is None: return None elif isinstance(result, expected_type): return result raise InvalidConfiguration( f"Configuration `{name}` is expected to have type " f"{expected_type} but got: `{json}`." ) def ensure_optional_string_or_string_dict( json: Dict[str, Any], name: str ) -> Optional[Union[Dict[str, str], str]]: result = json.pop(name, None) if result is None: return None elif isinstance(result, str): return result elif isinstance(result, Dict): for value in result.values(): if not isinstance(value, str): raise InvalidConfiguration( f"Configuration `{name}` is expected to be a " + f"dict of strings but got `{json}`." ) return result raise InvalidConfiguration( f"Configuration `{name}` is expected to be a string or a " + f"dict of strings but got `{json}`." ) def ensure_optional_string_list( json: Dict[str, Any], name: str ) -> Optional[List[str]]: result = json.pop(name, None) if result is None: return None elif is_list_of_string(result): return result raise InvalidConfiguration( f"Configuration `{name}` is expected to be a list of " + f"strings but got `{json}`." ) def ensure_string_list( json: Dict[str, Any], name: str, allow_single_string: bool = False ) -> List[str]: result = json.pop(name, []) if allow_single_string and isinstance(result, str): result = [result] if is_list_of_string(result): return result raise InvalidConfiguration( f"Configuration `{name}` is expected to be a list of " + f"strings but got `{json}`." ) def ensure_list(json: Dict[str, Any], name: str) -> List[Any]: result = json.pop(name, []) if isinstance(result, list): return result raise InvalidConfiguration( f"Configuration `{name}` is expected to be a list but got `{json}`." ) try: configuration_json = json.loads(contents) dot_pyre_directory = ensure_option_type( configuration_json, "dot_pyre_directory", str ) search_path_json = configuration_json.pop("search_path", []) if isinstance(search_path_json, list): search_path = [ element for json in search_path_json for element in create_search_paths( json, site_roots=get_site_roots() ) ] else: search_path = create_search_paths( search_path_json, site_roots=get_site_roots() ) python_version_json = configuration_json.pop("python_version", None) if python_version_json is None: python_version = None elif isinstance(python_version_json, str): python_version = PythonVersion.from_string(python_version_json) else: raise InvalidConfiguration( "Expect python version to be a string but got" + f"'{python_version_json}'" ) shared_memory_json = ensure_option_type( configuration_json, "shared_memory", dict ) if shared_memory_json is None: shared_memory = SharedMemory() else: shared_memory = SharedMemory( heap_size=ensure_option_type(shared_memory_json, "heap_size", int), dependency_table_power=ensure_option_type( shared_memory_json, "dependency_table_power", int ), hash_table_power=ensure_option_type( shared_memory_json, "hash_table_power", int ), ) for unrecognized_key in shared_memory_json: LOG.warning(f"Unrecognized configuration item: {unrecognized_key}") source_directories_json = ensure_option_type( configuration_json, "source_directories", list ) if isinstance(source_directories_json, list): source_directories = [ element for json in source_directories_json for element in create_search_paths( json, site_roots=get_site_roots() ) ] else: source_directories = None ide_features_json = ensure_option_type( configuration_json, "ide_features", dict ) if ide_features_json is None: ide_features = None else: ide_features = IdeFeatures( hover_enabled=ensure_option_type( ide_features_json, "hover_enabled", bool ), go_to_definition_enabled=ensure_option_type( ide_features_json, "go_to_definition_enabled", bool ), ) for unrecognized_key in ide_features_json: LOG.warning(f"Unrecognized configuration item: {unrecognized_key}") unwatched_dependency_json = ensure_option_type( configuration_json, "unwatched_dependency", dict ) if unwatched_dependency_json is None: unwatched_dependency = None else: unwatched_dependency = UnwatchedDependency.from_json( unwatched_dependency_json ) partial_configuration = PartialConfiguration( binary=ensure_option_type(configuration_json, "binary", str), buck_mode=PlatformAware.from_json( ensure_optional_string_or_string_dict( configuration_json, "buck_mode" ), "buck_mode", ), disabled=ensure_option_type(configuration_json, "disabled", bool), do_not_ignore_errors_in=ensure_string_list( configuration_json, "do_not_ignore_errors_in" ), dot_pyre_directory=Path(dot_pyre_directory) if dot_pyre_directory is not None else None, excludes=ensure_string_list( configuration_json, "exclude", allow_single_string=True ), extensions=[ ExtensionElement.from_json(json) for json in ensure_list(configuration_json, "extensions") ], ide_features=ide_features, ignore_all_errors=ensure_string_list( configuration_json, "ignore_all_errors" ), ignore_infer=ensure_string_list(configuration_json, "ignore_infer"), isolation_prefix=ensure_option_type( configuration_json, "isolation_prefix", str ), logger=ensure_option_type(configuration_json, "logger", str), number_of_workers=ensure_option_type( configuration_json, "workers", int ), oncall=ensure_option_type(configuration_json, "oncall", str), other_critical_files=ensure_string_list( configuration_json, "critical_files" ), pysa_version_hash=ensure_option_type( configuration_json, "pysa_version", str ), python_version=python_version, shared_memory=shared_memory, search_path=search_path, source_directories=source_directories, strict=ensure_option_type(configuration_json, "strict", bool), taint_models_path=ensure_string_list( configuration_json, "taint_models_path", allow_single_string=True ), targets=ensure_optional_string_list(configuration_json, "targets"), typeshed=ensure_option_type(configuration_json, "typeshed", str), unwatched_dependency=unwatched_dependency, use_buck2=ensure_option_type(configuration_json, "use_buck2", bool), version_hash=ensure_option_type(configuration_json, "version", str), ) # Check for deprecated and unused keys for ( deprecated_key, replacement_key, ) in PartialConfiguration._get_depreacted_map().items(): if deprecated_key in configuration_json: configuration_json.pop(deprecated_key) LOG.warning( f"Configuration file uses deprecated item `{deprecated_key}`. " f"Please migrate to its replacement `{replacement_key}`" ) extra_keys = PartialConfiguration._get_extra_keys() for unrecognized_key in configuration_json: if unrecognized_key not in extra_keys: LOG.warning(f"Unrecognized configuration item: {unrecognized_key}") return partial_configuration except json.JSONDecodeError as error: raise InvalidConfiguration(f"Invalid JSON file: {error}") @staticmethod def from_file(path: Path) -> "PartialConfiguration": try: contents = path.read_text(encoding="utf-8") return PartialConfiguration.from_string(contents) except OSError as error: raise InvalidConfiguration(f"Error when reading {path}: {error}") def expand_relative_paths(self, root: str) -> "PartialConfiguration": binary = self.binary if binary is not None: binary = expand_relative_path(root, binary) logger = self.logger if logger is not None: logger = expand_relative_path(root, logger) source_directories = self.source_directories if source_directories is not None: source_directories = [ path.expand_relative_root(root) for path in source_directories ] typeshed = self.typeshed if typeshed is not None: typeshed = expand_relative_path(root, typeshed) unwatched_dependency = self.unwatched_dependency if unwatched_dependency is not None: files = unwatched_dependency.files unwatched_dependency = UnwatchedDependency( change_indicator=unwatched_dependency.change_indicator, files=UnwatchedFiles( root=expand_relative_path(root, files.root), checksum_path=files.checksum_path, ), ) return PartialConfiguration( binary=binary, buck_mode=self.buck_mode, disabled=self.disabled, do_not_ignore_errors_in=[ expand_relative_path(root, path) for path in self.do_not_ignore_errors_in ], dot_pyre_directory=self.dot_pyre_directory, excludes=self.excludes, extensions=self.extensions, ide_features=self.ide_features, ignore_all_errors=[ expand_relative_path(root, path) for path in self.ignore_all_errors ], ignore_infer=[ expand_relative_path(root, path) for path in self.ignore_infer ], isolation_prefix=self.isolation_prefix, logger=logger, number_of_workers=self.number_of_workers, oncall=self.oncall, other_critical_files=[ expand_relative_path(root, path) for path in self.other_critical_files ], pysa_version_hash=self.pysa_version_hash, python_version=self.python_version, shared_memory=self.shared_memory, search_path=[path.expand_relative_root(root) for path in self.search_path], source_directories=source_directories, strict=self.strict, taint_models_path=[ expand_relative_path(root, path) for path in self.taint_models_path ], targets=self.targets, typeshed=typeshed, unwatched_dependency=unwatched_dependency, use_buck2=self.use_buck2, version_hash=self.version_hash, ) def merge_partial_configurations( base: PartialConfiguration, override: PartialConfiguration ) -> PartialConfiguration: def overwrite_base(base: Optional[T], override: Optional[T]) -> Optional[T]: return base if override is None else override def overwrite_base_ide_features( base: Optional[IdeFeatures], override: Optional[IdeFeatures] ) -> Optional[IdeFeatures]: if override is None: return base if base is None: return override return IdeFeatures( hover_enabled=overwrite_base(base.hover_enabled, override.hover_enabled), go_to_definition_enabled=overwrite_base( base.go_to_definition_enabled, override.go_to_definition_enabled ), ) def prepend_base(base: Sequence[T], override: Sequence[T]) -> Sequence[T]: return list(override) + list(base) def raise_when_overridden( base: Optional[T], override: Optional[T], name: str ) -> Optional[T]: if base is None: return override elif override is None: return base else: raise InvalidConfiguration( f"Configuration option `{name}` cannot be overridden." ) return PartialConfiguration( binary=overwrite_base(base.binary, override.binary), buck_mode=PlatformAware.merge(base.buck_mode, override.buck_mode), disabled=overwrite_base(base.disabled, override.disabled), do_not_ignore_errors_in=prepend_base( base.do_not_ignore_errors_in, override.do_not_ignore_errors_in ), dot_pyre_directory=overwrite_base( base.dot_pyre_directory, override.dot_pyre_directory ), excludes=prepend_base(base.excludes, override.excludes), extensions=prepend_base(base.extensions, override.extensions), ide_features=overwrite_base_ide_features( base.ide_features, override.ide_features ), ignore_all_errors=prepend_base( base.ignore_all_errors, override.ignore_all_errors ), ignore_infer=prepend_base(base.ignore_infer, override=override.ignore_infer), isolation_prefix=overwrite_base( base.isolation_prefix, override.isolation_prefix ), logger=overwrite_base(base.logger, override.logger), number_of_workers=overwrite_base( base.number_of_workers, override.number_of_workers ), oncall=overwrite_base(base.oncall, override.oncall), other_critical_files=prepend_base( base.other_critical_files, override.other_critical_files ), pysa_version_hash=overwrite_base( base.pysa_version_hash, override.pysa_version_hash ), python_version=overwrite_base(base.python_version, override.python_version), shared_memory=SharedMemory( heap_size=overwrite_base( base.shared_memory.heap_size, override.shared_memory.heap_size ), dependency_table_power=overwrite_base( base.shared_memory.dependency_table_power, override.shared_memory.dependency_table_power, ), hash_table_power=overwrite_base( base.shared_memory.hash_table_power, override.shared_memory.hash_table_power, ), ), search_path=prepend_base(base.search_path, override.search_path), source_directories=raise_when_overridden( base.source_directories, override.source_directories, name="source_directories", ), strict=overwrite_base(base.strict, override.strict), taint_models_path=prepend_base( base.taint_models_path, override.taint_models_path ), targets=raise_when_overridden(base.targets, override.targets, name="targets"), typeshed=overwrite_base(base.typeshed, override.typeshed), unwatched_dependency=overwrite_base( base.unwatched_dependency, override.unwatched_dependency ), use_buck2=overwrite_base(base.use_buck2, override.use_buck2), version_hash=overwrite_base(base.version_hash, override.version_hash), ) @dataclass(frozen=True) class Configuration: project_root: str dot_pyre_directory: Path binary: Optional[str] = None buck_mode: Optional[PlatformAware[str]] = None disabled: bool = False do_not_ignore_errors_in: Sequence[str] = field(default_factory=list) excludes: Sequence[str] = field(default_factory=list) extensions: Sequence[ExtensionElement] = field(default_factory=list) ide_features: Optional[IdeFeatures] = None ignore_all_errors: Sequence[str] = field(default_factory=list) ignore_infer: Sequence[str] = field(default_factory=list) isolation_prefix: Optional[str] = None logger: Optional[str] = None number_of_workers: Optional[int] = None oncall: Optional[str] = None other_critical_files: Sequence[str] = field(default_factory=list) pysa_version_hash: Optional[str] = None python_version: Optional[PythonVersion] = None shared_memory: SharedMemory = SharedMemory() relative_local_root: Optional[str] = None search_path: Sequence[SearchPathElement] = field(default_factory=list) source_directories: Optional[Sequence[SearchPathElement]] = None strict: bool = False taint_models_path: Sequence[str] = field(default_factory=list) targets: Optional[Sequence[str]] = None typeshed: Optional[str] = None unwatched_dependency: Optional[UnwatchedDependency] = None use_buck2: bool = False version_hash: Optional[str] = None @staticmethod def from_partial_configuration( project_root: Path, relative_local_root: Optional[str], partial_configuration: PartialConfiguration, in_virtual_environment: Optional[bool] = None, ) -> "Configuration": search_path = partial_configuration.search_path if len(search_path) == 0 and _in_virtual_environment(in_virtual_environment): LOG.warning("Using virtual environment site-packages in search path...") search_path = [SimpleSearchPathElement(root) for root in get_site_roots()] return Configuration( project_root=str(project_root), dot_pyre_directory=_get_optional_value( partial_configuration.dot_pyre_directory, project_root / LOG_DIRECTORY ), binary=partial_configuration.binary, buck_mode=partial_configuration.buck_mode, disabled=_get_optional_value(partial_configuration.disabled, default=False), do_not_ignore_errors_in=partial_configuration.do_not_ignore_errors_in, excludes=partial_configuration.excludes, extensions=partial_configuration.extensions, ide_features=partial_configuration.ide_features, ignore_all_errors=partial_configuration.ignore_all_errors, ignore_infer=partial_configuration.ignore_infer, isolation_prefix=partial_configuration.isolation_prefix, logger=partial_configuration.logger, number_of_workers=partial_configuration.number_of_workers, oncall=partial_configuration.oncall, other_critical_files=partial_configuration.other_critical_files, pysa_version_hash=partial_configuration.pysa_version_hash, python_version=partial_configuration.python_version, shared_memory=partial_configuration.shared_memory, relative_local_root=relative_local_root, search_path=[ path.expand_global_root(str(project_root)) for path in search_path ], source_directories=partial_configuration.source_directories, strict=_get_optional_value(partial_configuration.strict, default=False), taint_models_path=partial_configuration.taint_models_path, targets=partial_configuration.targets, typeshed=partial_configuration.typeshed, unwatched_dependency=partial_configuration.unwatched_dependency, use_buck2=_get_optional_value( partial_configuration.use_buck2, default=False ), version_hash=partial_configuration.version_hash, ) @property def log_directory(self) -> str: if self.relative_local_root is None: return str(self.dot_pyre_directory) return str(self.dot_pyre_directory / self.relative_local_root) @property def local_root(self) -> Optional[str]: if self.relative_local_root is None: return None return os.path.join(self.project_root, self.relative_local_root) def to_json(self) -> Dict[str, object]: """ This method is for display purpose only. Do *NOT* expect this method to produce JSONs that can be de-serialized back into configurations. """ binary = self.binary buck_mode = self.buck_mode isolation_prefix = self.isolation_prefix logger = self.logger number_of_workers = self.number_of_workers oncall = self.oncall pysa_version_hash = self.pysa_version_hash python_version = self.python_version relative_local_root = self.relative_local_root source_directories = self.source_directories targets = self.targets typeshed = self.typeshed unwatched_dependency = self.unwatched_dependency version_hash = self.version_hash return { "global_root": self.project_root, "dot_pyre_directory": str(self.dot_pyre_directory), **({"binary": binary} if binary is not None else {}), **({"buck_mode": buck_mode.to_json()} if buck_mode is not None else {}), "disabled": self.disabled, "do_not_ignore_errors_in": list(self.do_not_ignore_errors_in), "excludes": list(self.excludes), "extensions": list(self.extensions), "ignore_all_errors": list(self.ignore_all_errors), "ignore_infer": list(self.ignore_infer), **( {"isolation_prefix": isolation_prefix} if isolation_prefix is not None else {} ), **({"logger": logger} if logger is not None else {}), **({"oncall": oncall} if oncall is not None else {}), **({"workers": number_of_workers} if number_of_workers is not None else {}), "other_critical_files": list(self.other_critical_files), **( {"pysa_version_hash": pysa_version_hash} if pysa_version_hash is not None else {} ), **( {"python_version": python_version.to_string()} if python_version is not None else {} ), **( {"shared_memory": self.shared_memory.to_json()} if self.shared_memory != SharedMemory() else {} ), **( {"relative_local_root": relative_local_root} if relative_local_root is not None else {} ), "search_path": [path.path() for path in self.search_path], **( {"source_directories": [path.path() for path in source_directories]} if source_directories is not None else {} ), "strict": self.strict, "taint_models_path": list(self.taint_models_path), **({"targets": list(targets)} if targets is not None else {}), **({"typeshed": typeshed} if typeshed is not None else {}), **( {"unwatched_dependency": unwatched_dependency.to_json()} if unwatched_dependency is not None else {} ), "use_buck2": self.use_buck2, **({"version_hash": version_hash} if version_hash is not None else {}), } def get_source_directories(self) -> List[SearchPathElement]: return list(self.source_directories or []) def get_existent_unwatched_dependency( self, ) -> Optional[UnwatchedDependency]: unwatched_dependency = self.unwatched_dependency if unwatched_dependency is None: return None unwatched_root = Path(unwatched_dependency.files.root) try: if not unwatched_root.is_dir(): LOG.warning( "Nonexistent directory passed in to `unwatched_dependency`: " f"`{unwatched_root}`" ) return None checksum_path = unwatched_root / unwatched_dependency.files.checksum_path if not checksum_path.is_file(): LOG.warning( "Nonexistent file passed in to `unwatched_dependency`: " f"`{checksum_path}`" ) return None return self.unwatched_dependency except PermissionError as error: LOG.warning(str(error)) return None # Expansion and validation of search paths cannot happen at Configuration creation # because link trees need to be built first. def expand_and_get_existent_search_paths(self) -> List[SearchPathElement]: existent_paths = _expand_and_get_existent_paths(self.search_path) typeshed_root = self.get_typeshed_respecting_override() typeshed_paths = ( [] if typeshed_root is None else [ SimpleSearchPathElement(str(element)) for element in find_directories.find_typeshed_search_paths( Path(typeshed_root) ) ] ) # pyre-ignore: Unsupported operand [58]: `+` is not supported for # operand types `List[SearchPathElement]` and `Union[List[typing.Any], # List[SimpleSearchPathElement]]` return existent_paths + typeshed_paths def expand_and_filter_nonexistent_paths(self) -> "Configuration": source_directories = self.source_directories return Configuration( project_root=self.project_root, dot_pyre_directory=self.dot_pyre_directory, binary=self.binary, buck_mode=self.buck_mode, disabled=self.disabled, do_not_ignore_errors_in=self.do_not_ignore_errors_in, excludes=self.excludes, extensions=self.extensions, ide_features=self.ide_features, ignore_all_errors=self.ignore_all_errors, ignore_infer=self.ignore_infer, isolation_prefix=self.isolation_prefix, logger=self.logger, number_of_workers=self.number_of_workers, oncall=self.oncall, other_critical_files=self.other_critical_files, pysa_version_hash=self.pysa_version_hash, python_version=self.python_version, shared_memory=self.shared_memory, relative_local_root=self.relative_local_root, search_path=self.search_path, source_directories=_expand_and_get_existent_paths(source_directories) if source_directories else None, strict=self.strict, taint_models_path=self.taint_models_path, targets=self.targets, typeshed=self.typeshed, unwatched_dependency=self.unwatched_dependency, use_buck2=self.use_buck2, version_hash=self.version_hash, ) def get_existent_ignore_infer_paths(self) -> List[str]: existent_paths = [] for path in self.ignore_infer: if os.path.exists(path): existent_paths.append(path) else: LOG.warn(f"Filtering out nonexistent path in `ignore_infer`: {path}") return existent_paths def get_existent_do_not_ignore_errors_in_paths(self) -> List[str]: """ This is a separate method because we want to check for existing files at the time this is called, not when the configuration is constructed. """ ignore_paths = [ _expand_global_root(path, global_root=self.project_root) for path in self.do_not_ignore_errors_in ] paths = [] for path in ignore_paths: if os.path.exists(path): paths.append(path) else: LOG.debug( "Filtering out nonexistent paths in `do_not_ignore_errors_in`: " f"{path}" ) return paths def get_existent_ignore_all_errors_paths(self) -> List[str]: """ This is a separate method because we want to check for existing files at the time this is called, not when the configuration is constructed. """ return _expand_and_get_existent_ignore_all_errors_path( self.ignore_all_errors, self.project_root ) def get_binary_respecting_override(self) -> Optional[str]: binary = self.binary if binary is not None: return binary LOG.info(f"No binary specified, looking for `{BINARY_NAME}` in PATH") binary_candidate = shutil.which(BINARY_NAME) if binary_candidate is None: binary_candidate_name = os.path.join( os.path.dirname(sys.argv[0]), BINARY_NAME ) binary_candidate = shutil.which(binary_candidate_name) if binary_candidate is not None: return binary_candidate return None def get_typeshed_respecting_override(self) -> Optional[str]: typeshed = self.typeshed if typeshed is not None: return typeshed LOG.info("No typeshed specified, looking for it...") auto_determined_typeshed = find_directories.find_typeshed() if auto_determined_typeshed is None: LOG.warning( "Could not find a suitable typeshed. Types for Python builtins " "and standard libraries may be missing!" ) return None else: LOG.info(f"Found: `{auto_determined_typeshed}`") return str(auto_determined_typeshed) def get_version_hash_respecting_override(self) -> Optional[str]: overriding_version_hash = os.getenv("PYRE_VERSION_HASH") if overriding_version_hash: LOG.warning(f"Version hash overridden with `{overriding_version_hash}`") return overriding_version_hash return self.version_hash def get_binary_version(self) -> Optional[str]: binary = self.get_binary_respecting_override() if binary is None: return None status = subprocess.run( [binary, "-version"], stdout=subprocess.PIPE, universal_newlines=True ) return status.stdout.strip() if status.returncode == 0 else None def get_number_of_workers(self) -> int: number_of_workers = self.number_of_workers if number_of_workers is not None and number_of_workers > 0: return number_of_workers try: default_number_of_workers = max(multiprocessing.cpu_count() - 4, 1) except NotImplementedError: default_number_of_workers = 4 LOG.info( "Could not determine the number of Pyre workers from configuration. " f"Auto-set the value to {default_number_of_workers}." ) return default_number_of_workers def is_hover_enabled(self) -> bool: if self.ide_features is None: return IdeFeatures.DEFAULT_HOVER_ENABLED return self.ide_features.is_hover_enabled() def is_go_to_definition_enabled(self) -> bool: if self.ide_features is None: return IdeFeatures.DEFAULT_GO_TO_DEFINITION_ENABLED return self.ide_features.is_go_to_definition_enabled() def get_valid_extension_suffixes(self) -> List[str]: vaild_extensions = [] for extension in self.extensions: if not extension.suffix.startswith("."): LOG.warning( "Filtering out extension which does not start with `.`: " f"`{extension.suffix}`" ) else: vaild_extensions.append(extension.command_line_argument()) return vaild_extensions def get_isolation_prefix_respecting_override(self) -> Optional[str]: """We need this to disable an isolation prefix set in a configuration. Merely omitting the CLI flag would not disable the isolation prefix because we would just fall back to the configuration value. With this, we can pass `--isolation-prefix ''` as a CLI argument or override `isolation_prefix` as `""` in a local configuration.""" return None if self.isolation_prefix == "" else self.isolation_prefix def get_python_version(self) -> PythonVersion: python_version = self.python_version if python_version is not None: return python_version else: version_info = sys.version_info return PythonVersion( major=version_info.major, minor=version_info.minor, micro=version_info.micro, ) def create_configuration( arguments: command_arguments.CommandArguments, base_directory: Path ) -> Configuration: local_root_argument = arguments.local_configuration search_base = ( base_directory if local_root_argument is None else base_directory / local_root_argument ) found_root = find_directories.find_global_and_local_root(search_base) # If the local root was explicitly specified but does not exist, return an # error instead of falling back to current directory. if local_root_argument is not None: if found_root is None: raise InvalidConfiguration( "A local configuration path was explicitly specified, but no" + f" {CONFIGURATION_FILE} file was found in {search_base}" + " or its parents." ) elif found_root.local_root is None: raise InvalidConfiguration( "A local configuration path was explicitly specified, but no" + f" {LOCAL_CONFIGURATION_FILE} file was found in {search_base}" + " or its parents." ) command_argument_configuration = PartialConfiguration.from_command_arguments( arguments ).expand_relative_paths(str(Path.cwd())) if found_root is None: project_root = Path.cwd() relative_local_root = None partial_configuration = command_argument_configuration else: project_root = found_root.global_root relative_local_root = None partial_configuration = PartialConfiguration.from_file( project_root / CONFIGURATION_FILE ).expand_relative_paths(str(project_root)) local_root = found_root.local_root if local_root is not None: relative_local_root = get_relative_local_root(project_root, local_root) partial_configuration = merge_partial_configurations( base=partial_configuration, override=PartialConfiguration.from_file( local_root / LOCAL_CONFIGURATION_FILE ).expand_relative_paths(str(local_root)), ) partial_configuration = merge_partial_configurations( base=partial_configuration, override=command_argument_configuration, ) configuration = Configuration.from_partial_configuration( project_root, relative_local_root, partial_configuration ) return configuration.expand_and_filter_nonexistent_paths() def check_nested_local_configuration(configuration: Configuration) -> None: """ Raises `InvalidConfiguration` if the check fails. """ local_root = configuration.local_root if local_root is None: return def is_subdirectory(child: Path, parent: Path) -> bool: return parent == child or parent in child.parents # We search from the parent of the local root, looking for another local # configuration file that lives above the current one local_root_path = Path(local_root).resolve() current_directory = local_root_path.parent while True: found_root = find_directories.find_global_and_local_root(current_directory) if found_root is None: break nesting_local_root = found_root.local_root if nesting_local_root is None: break nesting_configuration = PartialConfiguration.from_file( nesting_local_root / LOCAL_CONFIGURATION_FILE ).expand_relative_paths(str(nesting_local_root)) nesting_ignored_all_errors_path = ( _expand_and_get_existent_ignore_all_errors_path( nesting_configuration.ignore_all_errors, str(found_root.global_root) ) ) if not any( is_subdirectory(child=local_root_path, parent=Path(path)) for path in nesting_ignored_all_errors_path ): error_message = ( "Local configuration is nested under another local configuration at " f"`{nesting_local_root}`.\nPlease add `{local_root_path}` to the " "`ignore_all_errors` field of the parent, or combine the sources " "into a single configuration, or split the parent configuration to " "avoid inconsistent errors." ) raise InvalidConfiguration(error_message) current_directory = nesting_local_root.parent def check_open_source_version(configuration: Configuration) -> None: """ Check if version specified in configuration matches running version and warn if it does not. """ expected_version = configuration.version_hash if expected_version is None or not re.match(r"\d+\.\d+\.\d+", expected_version): return try: # pyre-ignore[21]: dynamic import from pyre_check import __version__ as actual_version if expected_version != actual_version: LOG.warning( textwrap.dedent( f"""\ Your running version does not match the configured version for this project (running {actual_version}, expected {expected_version}).""" ) ) except ImportError: pass