client/configuration.py (1,425 lines of code) (raw):
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# pyre-unsafe
import abc
import dataclasses
import glob
import json
import logging
import multiprocessing
import os
import platform
import re
import shutil
import site
import subprocess
import sys
import textwrap
from dataclasses import dataclass, field
from logging import Logger
from pathlib import Path
from typing import (
Any,
Callable,
ClassVar,
Dict,
Generic,
Iterable,
List,
Optional,
Sequence,
Set,
Type,
TypeVar,
Union,
)
from . import command_arguments, find_directories
from .exceptions import EnvironmentException
from .filesystem import assert_readable_directory, expand_relative_path
from .find_directories import (
BINARY_NAME,
CONFIGURATION_FILE,
LOCAL_CONFIGURATION_FILE,
LOG_DIRECTORY,
get_relative_local_root,
)
LOG: Logger = logging.getLogger(__name__)
T = TypeVar("T")
def _expand_global_root(path: str, global_root: str) -> str:
if path.startswith("//"):
return expand_relative_path(global_root, path[2:])
return path
def _expand_relative_root(path: str, relative_root: str) -> str:
if not path.startswith("//"):
return expand_relative_path(relative_root, path)
return path
def _get_optional_value(source: Optional[T], default: T) -> T:
return source if source is not None else default
def _expand_and_get_existent_ignore_all_errors_path(
ignore_all_errors: Iterable[str], project_root: str
) -> List[str]:
expanded_ignore_paths = []
for path in ignore_all_errors:
expanded = glob.glob(_expand_global_root(path, global_root=project_root))
if not expanded:
expanded_ignore_paths.append(path)
else:
expanded_ignore_paths.extend(expanded)
paths = []
for path in expanded_ignore_paths:
if os.path.exists(path):
paths.append(path)
else:
LOG.warning(f"Nonexistent paths passed in to `ignore_all_errors`: `{path}`")
return paths
class InvalidConfiguration(Exception):
def __init__(self, message: str) -> None:
self.message = f"Invalid configuration: {message}"
super().__init__(self.message)
class InvalidPythonVersion(InvalidConfiguration):
def __init__(self, message: str) -> None:
super().__init__(message)
class SearchPathElement(abc.ABC):
@abc.abstractmethod
def path(self) -> str:
raise NotImplementedError
@abc.abstractmethod
def get_root(self) -> str:
raise NotImplementedError
@abc.abstractmethod
def command_line_argument(self) -> str:
raise NotImplementedError
@abc.abstractmethod
def expand_global_root(self, global_root: str) -> "SearchPathElement":
raise NotImplementedError
@abc.abstractmethod
def expand_relative_root(self, relative_root: str) -> "SearchPathElement":
raise NotImplementedError
@abc.abstractmethod
def expand_glob(self) -> List["SearchPathElement"]:
raise NotImplementedError
@dataclasses.dataclass(frozen=True)
class SimpleSearchPathElement(SearchPathElement):
root: str
def path(self) -> str:
return self.root
def get_root(self) -> str:
return self.root
def command_line_argument(self) -> str:
return self.root
def expand_global_root(self, global_root: str) -> SearchPathElement:
return SimpleSearchPathElement(
_expand_global_root(self.root, global_root=global_root)
)
def expand_relative_root(self, relative_root: str) -> SearchPathElement:
return SimpleSearchPathElement(
_expand_relative_root(self.root, relative_root=relative_root)
)
def expand_glob(self) -> List[SearchPathElement]:
expanded = sorted(glob.glob(self.get_root()))
if expanded:
return [SimpleSearchPathElement(path) for path in expanded]
else:
LOG.warning(f"'{self.path()}' does not match any paths.")
return []
@dataclasses.dataclass(frozen=True)
class SubdirectorySearchPathElement(SearchPathElement):
root: str
subdirectory: str
def path(self) -> str:
return os.path.join(self.root, self.subdirectory)
def get_root(self) -> str:
return self.root
def command_line_argument(self) -> str:
return self.root + "$" + self.subdirectory
def expand_global_root(self, global_root: str) -> SearchPathElement:
return SubdirectorySearchPathElement(
root=_expand_global_root(self.root, global_root=global_root),
subdirectory=self.subdirectory,
)
def expand_relative_root(self, relative_root: str) -> SearchPathElement:
return SubdirectorySearchPathElement(
root=_expand_relative_root(self.root, relative_root=relative_root),
subdirectory=self.subdirectory,
)
def expand_glob(self) -> List["SearchPathElement"]:
return [self]
@dataclasses.dataclass(frozen=True)
class SitePackageSearchPathElement(SearchPathElement):
site_root: str
package_name: str
is_toplevel_module: bool = False
def package_path(self) -> str:
module_suffix = ".py" if self.is_toplevel_module else ""
return self.package_name + module_suffix
def path(self) -> str:
return os.path.join(self.site_root, self.package_path())
def get_root(self) -> str:
return self.site_root
def command_line_argument(self) -> str:
return self.site_root + "$" + self.package_path()
def expand_global_root(self, global_root: str) -> SearchPathElement:
# Site package does not participate in root expansion.
return self
def expand_relative_root(self, relative_root: str) -> SearchPathElement:
# Site package does not participate in root expansion.
return self
def expand_glob(self) -> List["SearchPathElement"]:
return [self]
@dataclasses.dataclass
class ExtensionElement:
suffix: str
include_suffix_in_module_qualifier: bool
def command_line_argument(self) -> str:
options = ""
if self.include_suffix_in_module_qualifier:
options = "$" + "include_suffix_in_module_qualifier"
return self.suffix + options
@staticmethod
def from_json(json: Union[str, Dict[str, Union[str, bool]]]) -> "ExtensionElement":
if isinstance(json, str):
return ExtensionElement(
suffix=json, include_suffix_in_module_qualifier=False
)
elif isinstance(json, dict):
include_suffix_in_module_qualifier = False
if "include_suffix_in_module_qualifier" in json:
value = json["include_suffix_in_module_qualifier"]
if isinstance(value, bool):
include_suffix_in_module_qualifier = value
if "suffix" in json:
suffix = json["suffix"]
if isinstance(suffix, str):
return ExtensionElement(
suffix=suffix,
include_suffix_in_module_qualifier=include_suffix_in_module_qualifier,
)
raise InvalidConfiguration(f"Invalid extension element: {json}")
def get_site_roots() -> List[str]:
try:
return site.getsitepackages() + [site.getusersitepackages()]
except AttributeError:
# There are a few Python versions that ship with a broken venv,
# where `getsitepackages` is not available.
LOG.warning(
"Either `site.getusersitepackages()` or `site.getsitepackages()` "
+ "is not available in your virtualenv. This is a known virtualenv "
+ 'bug and as a workaround please avoid using `"site-package"` in '
+ "your search path configuration."
)
return []
def create_search_paths(
json: Union[str, Dict[str, Union[str, bool]]], site_roots: Iterable[str]
) -> List[SearchPathElement]:
if isinstance(json, str):
return [SimpleSearchPathElement(json)]
elif isinstance(json, dict):
if "root" in json and "subdirectory" in json:
return [
SubdirectorySearchPathElement(
root=str(json["root"]), subdirectory=str(json["subdirectory"])
)
]
if "import_root" in json and "source" in json:
return [
SubdirectorySearchPathElement(
root=str(json["import_root"]), subdirectory=str(json["source"])
)
]
elif "site-package" in json:
is_toplevel_module = (
"is_toplevel_module" in json and json["is_toplevel_module"]
)
return [
SitePackageSearchPathElement(
site_root=root,
package_name=str(json["site-package"]),
is_toplevel_module=bool(is_toplevel_module),
)
for root in site_roots
]
raise InvalidConfiguration(f"Invalid search path element: {json}")
def assert_readable_directory_in_configuration(
directory: str, field_name: str = ""
) -> None:
try:
assert_readable_directory(directory, error_message_prefix=f"{field_name} ")
except EnvironmentException as error:
raise InvalidConfiguration(str(error))
def _in_virtual_environment(override: Optional[bool] = None) -> bool:
if override is not None:
return override
return sys.prefix != sys.base_prefix
def _expand_and_get_existent_paths(
paths: Sequence[SearchPathElement],
) -> List[SearchPathElement]:
expanded_search_paths = [
expanded_path
for search_path_element in paths
for expanded_path in search_path_element.expand_glob()
]
existent_paths = []
for search_path_element in expanded_search_paths:
search_path = search_path_element.path()
if os.path.exists(search_path):
existent_paths.append(search_path_element)
else:
LOG.warning(f"Path does not exist: {search_path}")
return existent_paths
PLATFORM_MAPPING = {
"Darwin": "macos",
"Windows": "windows",
"Linux": "linux",
"default": "default",
}
@dataclass(frozen=True)
class PlatformAware(Generic[T]):
default: T
windows: Optional[T] = None
macos: Optional[T] = None
linux: Optional[T] = None
@staticmethod
def from_json(
value: Union[T, Dict[str, T]], field_name: str
) -> Optional["PlatformAware"]:
if value is None:
return None
elif isinstance(value, Dict):
if len(value) == 0:
return None
invalid_keys = value.keys() - PLATFORM_MAPPING.values()
if not len(invalid_keys) == 0:
raise InvalidConfiguration(
f"Configuration `{field_name}` only supports platforms: "
f"{PLATFORM_MAPPING.values()} but got: `{invalid_keys}`."
)
default = (
value["default"]
if "default" in value
else value[sorted(value.keys())[0]]
)
return PlatformAware(
default=default,
windows=value["windows"] if "windows" in value else None,
macos=value["macos"] if "macos" in value else None,
linux=value["linux"] if "linux" in value else None,
)
else:
return PlatformAware(default=value)
@staticmethod
def merge(
base: Optional["PlatformAware"], override: Optional["PlatformAware"]
) -> Optional["PlatformAware"]:
if base is None:
return override
elif override is None:
return base
else:
return PlatformAware(
default=override.default,
windows=override.windows
if override.windows is not None
else base.windows,
macos=override.macos if override.macos is not None else base.macos,
linux=override.linux if override.linux is not None else base.linux,
)
def get(self, key: Optional[str] = None) -> T:
if key is None:
key = PLATFORM_MAPPING[platform.system()]
value: T = self.__getattribute__(key)
return value if value is not None else self.default
def validate_values(self, check: Callable[[T], bool]):
return (
check(self.default)
and (check(self.windows) if self.windows is not None else True)
and (check(self.macos) if self.macos is not None else True)
and (check(self.linux) if self.linux is not None else True)
)
def to_json(self):
result = {"default": self.default}
if self.windows is not None:
result["windows"] = self.windows
if self.linux is not None:
result["linux"] = self.linux
if self.macos is not None:
result["macos"] = self.macos
return result
@dataclass(frozen=True)
class PythonVersion:
major: int
minor: int = 0
micro: int = 0
@staticmethod
def from_string(input: str) -> "PythonVersion":
try:
splits = input.split(".")
if len(splits) == 1:
return PythonVersion(major=int(splits[0]))
elif len(splits) == 2:
return PythonVersion(major=int(splits[0]), minor=int(splits[1]))
elif len(splits) == 3:
return PythonVersion(
major=int(splits[0]), minor=int(splits[1]), micro=int(splits[2])
)
raise InvalidPythonVersion(
"Version string is expected to have the form of 'X.Y.Z' but got "
+ f"'{input}'"
)
except ValueError as error:
raise InvalidPythonVersion(str(error))
def to_string(self) -> str:
return f"{self.major}.{self.minor}.{self.micro}"
@dataclass(frozen=True)
class SharedMemory:
heap_size: Optional[int] = None
dependency_table_power: Optional[int] = None
hash_table_power: Optional[int] = None
def to_json(self) -> Dict[str, int]:
heap_size = self.heap_size
dependency_table_power = self.dependency_table_power
hash_table_power = self.hash_table_power
return {
**({"heap_size": heap_size} if heap_size is not None else {}),
**(
{"dependency_table_power": dependency_table_power}
if dependency_table_power is not None
else {}
),
**(
{"hash_table_power": hash_table_power}
if hash_table_power is not None
else {}
),
}
@dataclass(frozen=True)
class IdeFeatures:
hover_enabled: Optional[bool] = None
DEFAULT_HOVER_ENABLED: ClassVar[bool] = False
go_to_definition_enabled: Optional[bool] = None
DEFAULT_GO_TO_DEFINITION_ENABLED: ClassVar[bool] = False
def to_json(self) -> Dict[str, int]:
return {
**(
{"hover_enabled": self.hover_enabled}
if self.hover_enabled is not None
else {}
),
**(
{"go_to_definition_enabled": self.go_to_definition_enabled}
if self.go_to_definition_enabled is not None
else {}
),
}
def is_hover_enabled(self) -> bool:
return (
self.hover_enabled
if self.hover_enabled is not None
else self.DEFAULT_HOVER_ENABLED
)
def is_go_to_definition_enabled(self) -> bool:
return (
self.go_to_definition_enabled
if self.go_to_definition_enabled is not None
else self.DEFAULT_GO_TO_DEFINITION_ENABLED
)
@dataclass(frozen=True)
class UnwatchedFiles:
root: str
checksum_path: str
@staticmethod
def from_json(json: Dict[str, object]) -> "UnwatchedFiles":
root = json.get("root", None)
if root is None:
raise InvalidConfiguration("Missing `root` field in UnwatchedFiles")
if not isinstance(root, str):
raise InvalidConfiguration(
"`root` field in UnwatchedFiles must be a string"
)
checksum_path = json.get("checksum_path", None)
if checksum_path is None:
raise InvalidConfiguration(
"Missing `checksum_path` field in UnwatchedFiles"
)
if not isinstance(checksum_path, str):
raise InvalidConfiguration(
"`checksum_path` field in UnwatchedFiles must be a string"
)
return UnwatchedFiles(root=root, checksum_path=checksum_path)
def to_json(self) -> Dict[str, str]:
return {
"root": self.root,
"checksum_path": self.checksum_path,
}
@dataclass(frozen=True)
class UnwatchedDependency:
change_indicator: str
files: UnwatchedFiles
@staticmethod
def from_json(json: Dict[str, object]) -> "UnwatchedDependency":
change_indicator = json.get("change_indicator", None)
if change_indicator is None:
raise InvalidConfiguration(
"Missing `change_indicator` field in UnwatchedDependency"
)
if not isinstance(change_indicator, str):
raise InvalidConfiguration(
"`change_indicator` field in UnwatchedDependency must be a string"
)
files_json = json.get("files", None)
if files_json is None:
raise InvalidConfiguration("Missing `files` field in UnwatchedDependency")
if not isinstance(files_json, dict):
raise InvalidConfiguration(
"`files` field in UnwatchedDependency must be a dict"
)
return UnwatchedDependency(
change_indicator=change_indicator,
files=UnwatchedFiles.from_json(files_json),
)
def to_json(self) -> Dict[str, object]:
return {
"change_indicator": str(self.change_indicator),
"files": self.files.to_json(),
}
@dataclass(frozen=True)
class PartialConfiguration:
binary: Optional[str] = None
buck_mode: Optional[PlatformAware[str]] = None
disabled: Optional[bool] = None
do_not_ignore_errors_in: Sequence[str] = field(default_factory=list)
dot_pyre_directory: Optional[Path] = None
excludes: Sequence[str] = field(default_factory=list)
extensions: Sequence[ExtensionElement] = field(default_factory=list)
ide_features: Optional[IdeFeatures] = None
ignore_all_errors: Sequence[str] = field(default_factory=list)
ignore_infer: Sequence[str] = field(default_factory=list)
isolation_prefix: Optional[str] = None
logger: Optional[str] = None
number_of_workers: Optional[int] = None
oncall: Optional[str] = None
other_critical_files: Sequence[str] = field(default_factory=list)
pysa_version_hash: Optional[str] = None
python_version: Optional[PythonVersion] = None
shared_memory: SharedMemory = SharedMemory()
search_path: Sequence[SearchPathElement] = field(default_factory=list)
source_directories: Optional[Sequence[SearchPathElement]] = None
strict: Optional[bool] = None
taint_models_path: Sequence[str] = field(default_factory=list)
targets: Optional[Sequence[str]] = None
typeshed: Optional[str] = None
unwatched_dependency: Optional[UnwatchedDependency] = None
use_buck2: Optional[bool] = None
version_hash: Optional[str] = None
@staticmethod
def _get_depreacted_map() -> Dict[str, str]:
return {"do_not_check": "ignore_all_errors"}
@staticmethod
def _get_extra_keys() -> Set[str]:
return {
"create_open_source_configuration",
"saved_state",
"stable_client",
"taint_models_path",
"unstable_client",
}
@staticmethod
def from_command_arguments(
arguments: command_arguments.CommandArguments,
) -> "PartialConfiguration":
strict: Optional[bool] = True if arguments.strict else None
source_directories = [
SimpleSearchPathElement(element) for element in arguments.source_directories
] or None
targets: Optional[List[str]] = (
arguments.targets if len(arguments.targets) > 0 else None
)
python_version_string = arguments.python_version
ide_features = (
IdeFeatures(
hover_enabled=arguments.enable_hover,
go_to_definition_enabled=arguments.enable_go_to_definition,
)
if arguments.enable_hover is not None
or arguments.enable_go_to_definition is not None
else None
)
return PartialConfiguration(
binary=arguments.binary,
buck_mode=PlatformAware.from_json(arguments.buck_mode, "buck_mode"),
disabled=None,
do_not_ignore_errors_in=arguments.do_not_ignore_errors_in,
dot_pyre_directory=arguments.dot_pyre_directory,
excludes=arguments.exclude,
extensions=[],
ide_features=ide_features,
ignore_all_errors=[],
ignore_infer=[],
isolation_prefix=arguments.isolation_prefix,
logger=arguments.logger,
number_of_workers=arguments.number_of_workers,
oncall=None,
other_critical_files=[],
pysa_version_hash=None,
python_version=(
PythonVersion.from_string(python_version_string)
if python_version_string is not None
else None
),
shared_memory=SharedMemory(
heap_size=arguments.shared_memory_heap_size,
dependency_table_power=arguments.shared_memory_dependency_table_power,
hash_table_power=arguments.shared_memory_hash_table_power,
),
search_path=[
SimpleSearchPathElement(element) for element in arguments.search_path
],
source_directories=source_directories,
strict=strict,
taint_models_path=[],
targets=targets,
typeshed=arguments.typeshed,
unwatched_dependency=None,
use_buck2=arguments.use_buck2,
version_hash=None,
)
@staticmethod
def from_string(contents: str) -> "PartialConfiguration":
def is_list_of_string(elements: object) -> bool:
return isinstance(elements, list) and all(
isinstance(element, str) for element in elements
)
def ensure_option_type(
json: Dict[str, Any], name: str, expected_type: Type[T]
) -> Optional[T]:
result = json.pop(name, None)
if result is None:
return None
elif isinstance(result, expected_type):
return result
raise InvalidConfiguration(
f"Configuration `{name}` is expected to have type "
f"{expected_type} but got: `{json}`."
)
def ensure_optional_string_or_string_dict(
json: Dict[str, Any], name: str
) -> Optional[Union[Dict[str, str], str]]:
result = json.pop(name, None)
if result is None:
return None
elif isinstance(result, str):
return result
elif isinstance(result, Dict):
for value in result.values():
if not isinstance(value, str):
raise InvalidConfiguration(
f"Configuration `{name}` is expected to be a "
+ f"dict of strings but got `{json}`."
)
return result
raise InvalidConfiguration(
f"Configuration `{name}` is expected to be a string or a "
+ f"dict of strings but got `{json}`."
)
def ensure_optional_string_list(
json: Dict[str, Any], name: str
) -> Optional[List[str]]:
result = json.pop(name, None)
if result is None:
return None
elif is_list_of_string(result):
return result
raise InvalidConfiguration(
f"Configuration `{name}` is expected to be a list of "
+ f"strings but got `{json}`."
)
def ensure_string_list(
json: Dict[str, Any], name: str, allow_single_string: bool = False
) -> List[str]:
result = json.pop(name, [])
if allow_single_string and isinstance(result, str):
result = [result]
if is_list_of_string(result):
return result
raise InvalidConfiguration(
f"Configuration `{name}` is expected to be a list of "
+ f"strings but got `{json}`."
)
def ensure_list(json: Dict[str, Any], name: str) -> List[Any]:
result = json.pop(name, [])
if isinstance(result, list):
return result
raise InvalidConfiguration(
f"Configuration `{name}` is expected to be a list but got `{json}`."
)
try:
configuration_json = json.loads(contents)
dot_pyre_directory = ensure_option_type(
configuration_json, "dot_pyre_directory", str
)
search_path_json = configuration_json.pop("search_path", [])
if isinstance(search_path_json, list):
search_path = [
element
for json in search_path_json
for element in create_search_paths(
json, site_roots=get_site_roots()
)
]
else:
search_path = create_search_paths(
search_path_json, site_roots=get_site_roots()
)
python_version_json = configuration_json.pop("python_version", None)
if python_version_json is None:
python_version = None
elif isinstance(python_version_json, str):
python_version = PythonVersion.from_string(python_version_json)
else:
raise InvalidConfiguration(
"Expect python version to be a string but got"
+ f"'{python_version_json}'"
)
shared_memory_json = ensure_option_type(
configuration_json, "shared_memory", dict
)
if shared_memory_json is None:
shared_memory = SharedMemory()
else:
shared_memory = SharedMemory(
heap_size=ensure_option_type(shared_memory_json, "heap_size", int),
dependency_table_power=ensure_option_type(
shared_memory_json, "dependency_table_power", int
),
hash_table_power=ensure_option_type(
shared_memory_json, "hash_table_power", int
),
)
for unrecognized_key in shared_memory_json:
LOG.warning(f"Unrecognized configuration item: {unrecognized_key}")
source_directories_json = ensure_option_type(
configuration_json, "source_directories", list
)
if isinstance(source_directories_json, list):
source_directories = [
element
for json in source_directories_json
for element in create_search_paths(
json, site_roots=get_site_roots()
)
]
else:
source_directories = None
ide_features_json = ensure_option_type(
configuration_json, "ide_features", dict
)
if ide_features_json is None:
ide_features = None
else:
ide_features = IdeFeatures(
hover_enabled=ensure_option_type(
ide_features_json, "hover_enabled", bool
),
go_to_definition_enabled=ensure_option_type(
ide_features_json, "go_to_definition_enabled", bool
),
)
for unrecognized_key in ide_features_json:
LOG.warning(f"Unrecognized configuration item: {unrecognized_key}")
unwatched_dependency_json = ensure_option_type(
configuration_json, "unwatched_dependency", dict
)
if unwatched_dependency_json is None:
unwatched_dependency = None
else:
unwatched_dependency = UnwatchedDependency.from_json(
unwatched_dependency_json
)
partial_configuration = PartialConfiguration(
binary=ensure_option_type(configuration_json, "binary", str),
buck_mode=PlatformAware.from_json(
ensure_optional_string_or_string_dict(
configuration_json, "buck_mode"
),
"buck_mode",
),
disabled=ensure_option_type(configuration_json, "disabled", bool),
do_not_ignore_errors_in=ensure_string_list(
configuration_json, "do_not_ignore_errors_in"
),
dot_pyre_directory=Path(dot_pyre_directory)
if dot_pyre_directory is not None
else None,
excludes=ensure_string_list(
configuration_json, "exclude", allow_single_string=True
),
extensions=[
ExtensionElement.from_json(json)
for json in ensure_list(configuration_json, "extensions")
],
ide_features=ide_features,
ignore_all_errors=ensure_string_list(
configuration_json, "ignore_all_errors"
),
ignore_infer=ensure_string_list(configuration_json, "ignore_infer"),
isolation_prefix=ensure_option_type(
configuration_json, "isolation_prefix", str
),
logger=ensure_option_type(configuration_json, "logger", str),
number_of_workers=ensure_option_type(
configuration_json, "workers", int
),
oncall=ensure_option_type(configuration_json, "oncall", str),
other_critical_files=ensure_string_list(
configuration_json, "critical_files"
),
pysa_version_hash=ensure_option_type(
configuration_json, "pysa_version", str
),
python_version=python_version,
shared_memory=shared_memory,
search_path=search_path,
source_directories=source_directories,
strict=ensure_option_type(configuration_json, "strict", bool),
taint_models_path=ensure_string_list(
configuration_json, "taint_models_path", allow_single_string=True
),
targets=ensure_optional_string_list(configuration_json, "targets"),
typeshed=ensure_option_type(configuration_json, "typeshed", str),
unwatched_dependency=unwatched_dependency,
use_buck2=ensure_option_type(configuration_json, "use_buck2", bool),
version_hash=ensure_option_type(configuration_json, "version", str),
)
# Check for deprecated and unused keys
for (
deprecated_key,
replacement_key,
) in PartialConfiguration._get_depreacted_map().items():
if deprecated_key in configuration_json:
configuration_json.pop(deprecated_key)
LOG.warning(
f"Configuration file uses deprecated item `{deprecated_key}`. "
f"Please migrate to its replacement `{replacement_key}`"
)
extra_keys = PartialConfiguration._get_extra_keys()
for unrecognized_key in configuration_json:
if unrecognized_key not in extra_keys:
LOG.warning(f"Unrecognized configuration item: {unrecognized_key}")
return partial_configuration
except json.JSONDecodeError as error:
raise InvalidConfiguration(f"Invalid JSON file: {error}")
@staticmethod
def from_file(path: Path) -> "PartialConfiguration":
try:
contents = path.read_text(encoding="utf-8")
return PartialConfiguration.from_string(contents)
except OSError as error:
raise InvalidConfiguration(f"Error when reading {path}: {error}")
def expand_relative_paths(self, root: str) -> "PartialConfiguration":
binary = self.binary
if binary is not None:
binary = expand_relative_path(root, binary)
logger = self.logger
if logger is not None:
logger = expand_relative_path(root, logger)
source_directories = self.source_directories
if source_directories is not None:
source_directories = [
path.expand_relative_root(root) for path in source_directories
]
typeshed = self.typeshed
if typeshed is not None:
typeshed = expand_relative_path(root, typeshed)
unwatched_dependency = self.unwatched_dependency
if unwatched_dependency is not None:
files = unwatched_dependency.files
unwatched_dependency = UnwatchedDependency(
change_indicator=unwatched_dependency.change_indicator,
files=UnwatchedFiles(
root=expand_relative_path(root, files.root),
checksum_path=files.checksum_path,
),
)
return PartialConfiguration(
binary=binary,
buck_mode=self.buck_mode,
disabled=self.disabled,
do_not_ignore_errors_in=[
expand_relative_path(root, path)
for path in self.do_not_ignore_errors_in
],
dot_pyre_directory=self.dot_pyre_directory,
excludes=self.excludes,
extensions=self.extensions,
ide_features=self.ide_features,
ignore_all_errors=[
expand_relative_path(root, path) for path in self.ignore_all_errors
],
ignore_infer=[
expand_relative_path(root, path) for path in self.ignore_infer
],
isolation_prefix=self.isolation_prefix,
logger=logger,
number_of_workers=self.number_of_workers,
oncall=self.oncall,
other_critical_files=[
expand_relative_path(root, path) for path in self.other_critical_files
],
pysa_version_hash=self.pysa_version_hash,
python_version=self.python_version,
shared_memory=self.shared_memory,
search_path=[path.expand_relative_root(root) for path in self.search_path],
source_directories=source_directories,
strict=self.strict,
taint_models_path=[
expand_relative_path(root, path) for path in self.taint_models_path
],
targets=self.targets,
typeshed=typeshed,
unwatched_dependency=unwatched_dependency,
use_buck2=self.use_buck2,
version_hash=self.version_hash,
)
def merge_partial_configurations(
base: PartialConfiguration, override: PartialConfiguration
) -> PartialConfiguration:
def overwrite_base(base: Optional[T], override: Optional[T]) -> Optional[T]:
return base if override is None else override
def overwrite_base_ide_features(
base: Optional[IdeFeatures], override: Optional[IdeFeatures]
) -> Optional[IdeFeatures]:
if override is None:
return base
if base is None:
return override
return IdeFeatures(
hover_enabled=overwrite_base(base.hover_enabled, override.hover_enabled),
go_to_definition_enabled=overwrite_base(
base.go_to_definition_enabled, override.go_to_definition_enabled
),
)
def prepend_base(base: Sequence[T], override: Sequence[T]) -> Sequence[T]:
return list(override) + list(base)
def raise_when_overridden(
base: Optional[T], override: Optional[T], name: str
) -> Optional[T]:
if base is None:
return override
elif override is None:
return base
else:
raise InvalidConfiguration(
f"Configuration option `{name}` cannot be overridden."
)
return PartialConfiguration(
binary=overwrite_base(base.binary, override.binary),
buck_mode=PlatformAware.merge(base.buck_mode, override.buck_mode),
disabled=overwrite_base(base.disabled, override.disabled),
do_not_ignore_errors_in=prepend_base(
base.do_not_ignore_errors_in, override.do_not_ignore_errors_in
),
dot_pyre_directory=overwrite_base(
base.dot_pyre_directory, override.dot_pyre_directory
),
excludes=prepend_base(base.excludes, override.excludes),
extensions=prepend_base(base.extensions, override.extensions),
ide_features=overwrite_base_ide_features(
base.ide_features, override.ide_features
),
ignore_all_errors=prepend_base(
base.ignore_all_errors, override.ignore_all_errors
),
ignore_infer=prepend_base(base.ignore_infer, override=override.ignore_infer),
isolation_prefix=overwrite_base(
base.isolation_prefix, override.isolation_prefix
),
logger=overwrite_base(base.logger, override.logger),
number_of_workers=overwrite_base(
base.number_of_workers, override.number_of_workers
),
oncall=overwrite_base(base.oncall, override.oncall),
other_critical_files=prepend_base(
base.other_critical_files, override.other_critical_files
),
pysa_version_hash=overwrite_base(
base.pysa_version_hash, override.pysa_version_hash
),
python_version=overwrite_base(base.python_version, override.python_version),
shared_memory=SharedMemory(
heap_size=overwrite_base(
base.shared_memory.heap_size, override.shared_memory.heap_size
),
dependency_table_power=overwrite_base(
base.shared_memory.dependency_table_power,
override.shared_memory.dependency_table_power,
),
hash_table_power=overwrite_base(
base.shared_memory.hash_table_power,
override.shared_memory.hash_table_power,
),
),
search_path=prepend_base(base.search_path, override.search_path),
source_directories=raise_when_overridden(
base.source_directories,
override.source_directories,
name="source_directories",
),
strict=overwrite_base(base.strict, override.strict),
taint_models_path=prepend_base(
base.taint_models_path, override.taint_models_path
),
targets=raise_when_overridden(base.targets, override.targets, name="targets"),
typeshed=overwrite_base(base.typeshed, override.typeshed),
unwatched_dependency=overwrite_base(
base.unwatched_dependency, override.unwatched_dependency
),
use_buck2=overwrite_base(base.use_buck2, override.use_buck2),
version_hash=overwrite_base(base.version_hash, override.version_hash),
)
@dataclass(frozen=True)
class Configuration:
project_root: str
dot_pyre_directory: Path
binary: Optional[str] = None
buck_mode: Optional[PlatformAware[str]] = None
disabled: bool = False
do_not_ignore_errors_in: Sequence[str] = field(default_factory=list)
excludes: Sequence[str] = field(default_factory=list)
extensions: Sequence[ExtensionElement] = field(default_factory=list)
ide_features: Optional[IdeFeatures] = None
ignore_all_errors: Sequence[str] = field(default_factory=list)
ignore_infer: Sequence[str] = field(default_factory=list)
isolation_prefix: Optional[str] = None
logger: Optional[str] = None
number_of_workers: Optional[int] = None
oncall: Optional[str] = None
other_critical_files: Sequence[str] = field(default_factory=list)
pysa_version_hash: Optional[str] = None
python_version: Optional[PythonVersion] = None
shared_memory: SharedMemory = SharedMemory()
relative_local_root: Optional[str] = None
search_path: Sequence[SearchPathElement] = field(default_factory=list)
source_directories: Optional[Sequence[SearchPathElement]] = None
strict: bool = False
taint_models_path: Sequence[str] = field(default_factory=list)
targets: Optional[Sequence[str]] = None
typeshed: Optional[str] = None
unwatched_dependency: Optional[UnwatchedDependency] = None
use_buck2: bool = False
version_hash: Optional[str] = None
@staticmethod
def from_partial_configuration(
project_root: Path,
relative_local_root: Optional[str],
partial_configuration: PartialConfiguration,
in_virtual_environment: Optional[bool] = None,
) -> "Configuration":
search_path = partial_configuration.search_path
if len(search_path) == 0 and _in_virtual_environment(in_virtual_environment):
LOG.warning("Using virtual environment site-packages in search path...")
search_path = [SimpleSearchPathElement(root) for root in get_site_roots()]
return Configuration(
project_root=str(project_root),
dot_pyre_directory=_get_optional_value(
partial_configuration.dot_pyre_directory, project_root / LOG_DIRECTORY
),
binary=partial_configuration.binary,
buck_mode=partial_configuration.buck_mode,
disabled=_get_optional_value(partial_configuration.disabled, default=False),
do_not_ignore_errors_in=partial_configuration.do_not_ignore_errors_in,
excludes=partial_configuration.excludes,
extensions=partial_configuration.extensions,
ide_features=partial_configuration.ide_features,
ignore_all_errors=partial_configuration.ignore_all_errors,
ignore_infer=partial_configuration.ignore_infer,
isolation_prefix=partial_configuration.isolation_prefix,
logger=partial_configuration.logger,
number_of_workers=partial_configuration.number_of_workers,
oncall=partial_configuration.oncall,
other_critical_files=partial_configuration.other_critical_files,
pysa_version_hash=partial_configuration.pysa_version_hash,
python_version=partial_configuration.python_version,
shared_memory=partial_configuration.shared_memory,
relative_local_root=relative_local_root,
search_path=[
path.expand_global_root(str(project_root)) for path in search_path
],
source_directories=partial_configuration.source_directories,
strict=_get_optional_value(partial_configuration.strict, default=False),
taint_models_path=partial_configuration.taint_models_path,
targets=partial_configuration.targets,
typeshed=partial_configuration.typeshed,
unwatched_dependency=partial_configuration.unwatched_dependency,
use_buck2=_get_optional_value(
partial_configuration.use_buck2, default=False
),
version_hash=partial_configuration.version_hash,
)
@property
def log_directory(self) -> str:
if self.relative_local_root is None:
return str(self.dot_pyre_directory)
return str(self.dot_pyre_directory / self.relative_local_root)
@property
def local_root(self) -> Optional[str]:
if self.relative_local_root is None:
return None
return os.path.join(self.project_root, self.relative_local_root)
def to_json(self) -> Dict[str, object]:
"""
This method is for display purpose only. Do *NOT* expect this method
to produce JSONs that can be de-serialized back into configurations.
"""
binary = self.binary
buck_mode = self.buck_mode
isolation_prefix = self.isolation_prefix
logger = self.logger
number_of_workers = self.number_of_workers
oncall = self.oncall
pysa_version_hash = self.pysa_version_hash
python_version = self.python_version
relative_local_root = self.relative_local_root
source_directories = self.source_directories
targets = self.targets
typeshed = self.typeshed
unwatched_dependency = self.unwatched_dependency
version_hash = self.version_hash
return {
"global_root": self.project_root,
"dot_pyre_directory": str(self.dot_pyre_directory),
**({"binary": binary} if binary is not None else {}),
**({"buck_mode": buck_mode.to_json()} if buck_mode is not None else {}),
"disabled": self.disabled,
"do_not_ignore_errors_in": list(self.do_not_ignore_errors_in),
"excludes": list(self.excludes),
"extensions": list(self.extensions),
"ignore_all_errors": list(self.ignore_all_errors),
"ignore_infer": list(self.ignore_infer),
**(
{"isolation_prefix": isolation_prefix}
if isolation_prefix is not None
else {}
),
**({"logger": logger} if logger is not None else {}),
**({"oncall": oncall} if oncall is not None else {}),
**({"workers": number_of_workers} if number_of_workers is not None else {}),
"other_critical_files": list(self.other_critical_files),
**(
{"pysa_version_hash": pysa_version_hash}
if pysa_version_hash is not None
else {}
),
**(
{"python_version": python_version.to_string()}
if python_version is not None
else {}
),
**(
{"shared_memory": self.shared_memory.to_json()}
if self.shared_memory != SharedMemory()
else {}
),
**(
{"relative_local_root": relative_local_root}
if relative_local_root is not None
else {}
),
"search_path": [path.path() for path in self.search_path],
**(
{"source_directories": [path.path() for path in source_directories]}
if source_directories is not None
else {}
),
"strict": self.strict,
"taint_models_path": list(self.taint_models_path),
**({"targets": list(targets)} if targets is not None else {}),
**({"typeshed": typeshed} if typeshed is not None else {}),
**(
{"unwatched_dependency": unwatched_dependency.to_json()}
if unwatched_dependency is not None
else {}
),
"use_buck2": self.use_buck2,
**({"version_hash": version_hash} if version_hash is not None else {}),
}
def get_source_directories(self) -> List[SearchPathElement]:
return list(self.source_directories or [])
def get_existent_unwatched_dependency(
self,
) -> Optional[UnwatchedDependency]:
unwatched_dependency = self.unwatched_dependency
if unwatched_dependency is None:
return None
unwatched_root = Path(unwatched_dependency.files.root)
try:
if not unwatched_root.is_dir():
LOG.warning(
"Nonexistent directory passed in to `unwatched_dependency`: "
f"`{unwatched_root}`"
)
return None
checksum_path = unwatched_root / unwatched_dependency.files.checksum_path
if not checksum_path.is_file():
LOG.warning(
"Nonexistent file passed in to `unwatched_dependency`: "
f"`{checksum_path}`"
)
return None
return self.unwatched_dependency
except PermissionError as error:
LOG.warning(str(error))
return None
# Expansion and validation of search paths cannot happen at Configuration creation
# because link trees need to be built first.
def expand_and_get_existent_search_paths(self) -> List[SearchPathElement]:
existent_paths = _expand_and_get_existent_paths(self.search_path)
typeshed_root = self.get_typeshed_respecting_override()
typeshed_paths = (
[]
if typeshed_root is None
else [
SimpleSearchPathElement(str(element))
for element in find_directories.find_typeshed_search_paths(
Path(typeshed_root)
)
]
)
# pyre-ignore: Unsupported operand [58]: `+` is not supported for
# operand types `List[SearchPathElement]` and `Union[List[typing.Any],
# List[SimpleSearchPathElement]]`
return existent_paths + typeshed_paths
def expand_and_filter_nonexistent_paths(self) -> "Configuration":
source_directories = self.source_directories
return Configuration(
project_root=self.project_root,
dot_pyre_directory=self.dot_pyre_directory,
binary=self.binary,
buck_mode=self.buck_mode,
disabled=self.disabled,
do_not_ignore_errors_in=self.do_not_ignore_errors_in,
excludes=self.excludes,
extensions=self.extensions,
ide_features=self.ide_features,
ignore_all_errors=self.ignore_all_errors,
ignore_infer=self.ignore_infer,
isolation_prefix=self.isolation_prefix,
logger=self.logger,
number_of_workers=self.number_of_workers,
oncall=self.oncall,
other_critical_files=self.other_critical_files,
pysa_version_hash=self.pysa_version_hash,
python_version=self.python_version,
shared_memory=self.shared_memory,
relative_local_root=self.relative_local_root,
search_path=self.search_path,
source_directories=_expand_and_get_existent_paths(source_directories)
if source_directories
else None,
strict=self.strict,
taint_models_path=self.taint_models_path,
targets=self.targets,
typeshed=self.typeshed,
unwatched_dependency=self.unwatched_dependency,
use_buck2=self.use_buck2,
version_hash=self.version_hash,
)
def get_existent_ignore_infer_paths(self) -> List[str]:
existent_paths = []
for path in self.ignore_infer:
if os.path.exists(path):
existent_paths.append(path)
else:
LOG.warn(f"Filtering out nonexistent path in `ignore_infer`: {path}")
return existent_paths
def get_existent_do_not_ignore_errors_in_paths(self) -> List[str]:
"""
This is a separate method because we want to check for existing files
at the time this is called, not when the configuration is
constructed.
"""
ignore_paths = [
_expand_global_root(path, global_root=self.project_root)
for path in self.do_not_ignore_errors_in
]
paths = []
for path in ignore_paths:
if os.path.exists(path):
paths.append(path)
else:
LOG.debug(
"Filtering out nonexistent paths in `do_not_ignore_errors_in`: "
f"{path}"
)
return paths
def get_existent_ignore_all_errors_paths(self) -> List[str]:
"""
This is a separate method because we want to check for existing files
at the time this is called, not when the configuration is
constructed.
"""
return _expand_and_get_existent_ignore_all_errors_path(
self.ignore_all_errors, self.project_root
)
def get_binary_respecting_override(self) -> Optional[str]:
binary = self.binary
if binary is not None:
return binary
LOG.info(f"No binary specified, looking for `{BINARY_NAME}` in PATH")
binary_candidate = shutil.which(BINARY_NAME)
if binary_candidate is None:
binary_candidate_name = os.path.join(
os.path.dirname(sys.argv[0]), BINARY_NAME
)
binary_candidate = shutil.which(binary_candidate_name)
if binary_candidate is not None:
return binary_candidate
return None
def get_typeshed_respecting_override(self) -> Optional[str]:
typeshed = self.typeshed
if typeshed is not None:
return typeshed
LOG.info("No typeshed specified, looking for it...")
auto_determined_typeshed = find_directories.find_typeshed()
if auto_determined_typeshed is None:
LOG.warning(
"Could not find a suitable typeshed. Types for Python builtins "
"and standard libraries may be missing!"
)
return None
else:
LOG.info(f"Found: `{auto_determined_typeshed}`")
return str(auto_determined_typeshed)
def get_version_hash_respecting_override(self) -> Optional[str]:
overriding_version_hash = os.getenv("PYRE_VERSION_HASH")
if overriding_version_hash:
LOG.warning(f"Version hash overridden with `{overriding_version_hash}`")
return overriding_version_hash
return self.version_hash
def get_binary_version(self) -> Optional[str]:
binary = self.get_binary_respecting_override()
if binary is None:
return None
status = subprocess.run(
[binary, "-version"], stdout=subprocess.PIPE, universal_newlines=True
)
return status.stdout.strip() if status.returncode == 0 else None
def get_number_of_workers(self) -> int:
number_of_workers = self.number_of_workers
if number_of_workers is not None and number_of_workers > 0:
return number_of_workers
try:
default_number_of_workers = max(multiprocessing.cpu_count() - 4, 1)
except NotImplementedError:
default_number_of_workers = 4
LOG.info(
"Could not determine the number of Pyre workers from configuration. "
f"Auto-set the value to {default_number_of_workers}."
)
return default_number_of_workers
def is_hover_enabled(self) -> bool:
if self.ide_features is None:
return IdeFeatures.DEFAULT_HOVER_ENABLED
return self.ide_features.is_hover_enabled()
def is_go_to_definition_enabled(self) -> bool:
if self.ide_features is None:
return IdeFeatures.DEFAULT_GO_TO_DEFINITION_ENABLED
return self.ide_features.is_go_to_definition_enabled()
def get_valid_extension_suffixes(self) -> List[str]:
vaild_extensions = []
for extension in self.extensions:
if not extension.suffix.startswith("."):
LOG.warning(
"Filtering out extension which does not start with `.`: "
f"`{extension.suffix}`"
)
else:
vaild_extensions.append(extension.command_line_argument())
return vaild_extensions
def get_isolation_prefix_respecting_override(self) -> Optional[str]:
"""We need this to disable an isolation prefix set in a configuration.
Merely omitting the CLI flag would not disable the isolation prefix
because we would just fall back to the configuration value.
With this, we can pass `--isolation-prefix ''` as a CLI argument or
override `isolation_prefix` as `""` in a local configuration."""
return None if self.isolation_prefix == "" else self.isolation_prefix
def get_python_version(self) -> PythonVersion:
python_version = self.python_version
if python_version is not None:
return python_version
else:
version_info = sys.version_info
return PythonVersion(
major=version_info.major,
minor=version_info.minor,
micro=version_info.micro,
)
def create_configuration(
arguments: command_arguments.CommandArguments, base_directory: Path
) -> Configuration:
local_root_argument = arguments.local_configuration
search_base = (
base_directory
if local_root_argument is None
else base_directory / local_root_argument
)
found_root = find_directories.find_global_and_local_root(search_base)
# If the local root was explicitly specified but does not exist, return an
# error instead of falling back to current directory.
if local_root_argument is not None:
if found_root is None:
raise InvalidConfiguration(
"A local configuration path was explicitly specified, but no"
+ f" {CONFIGURATION_FILE} file was found in {search_base}"
+ " or its parents."
)
elif found_root.local_root is None:
raise InvalidConfiguration(
"A local configuration path was explicitly specified, but no"
+ f" {LOCAL_CONFIGURATION_FILE} file was found in {search_base}"
+ " or its parents."
)
command_argument_configuration = PartialConfiguration.from_command_arguments(
arguments
).expand_relative_paths(str(Path.cwd()))
if found_root is None:
project_root = Path.cwd()
relative_local_root = None
partial_configuration = command_argument_configuration
else:
project_root = found_root.global_root
relative_local_root = None
partial_configuration = PartialConfiguration.from_file(
project_root / CONFIGURATION_FILE
).expand_relative_paths(str(project_root))
local_root = found_root.local_root
if local_root is not None:
relative_local_root = get_relative_local_root(project_root, local_root)
partial_configuration = merge_partial_configurations(
base=partial_configuration,
override=PartialConfiguration.from_file(
local_root / LOCAL_CONFIGURATION_FILE
).expand_relative_paths(str(local_root)),
)
partial_configuration = merge_partial_configurations(
base=partial_configuration,
override=command_argument_configuration,
)
configuration = Configuration.from_partial_configuration(
project_root, relative_local_root, partial_configuration
)
return configuration.expand_and_filter_nonexistent_paths()
def check_nested_local_configuration(configuration: Configuration) -> None:
"""
Raises `InvalidConfiguration` if the check fails.
"""
local_root = configuration.local_root
if local_root is None:
return
def is_subdirectory(child: Path, parent: Path) -> bool:
return parent == child or parent in child.parents
# We search from the parent of the local root, looking for another local
# configuration file that lives above the current one
local_root_path = Path(local_root).resolve()
current_directory = local_root_path.parent
while True:
found_root = find_directories.find_global_and_local_root(current_directory)
if found_root is None:
break
nesting_local_root = found_root.local_root
if nesting_local_root is None:
break
nesting_configuration = PartialConfiguration.from_file(
nesting_local_root / LOCAL_CONFIGURATION_FILE
).expand_relative_paths(str(nesting_local_root))
nesting_ignored_all_errors_path = (
_expand_and_get_existent_ignore_all_errors_path(
nesting_configuration.ignore_all_errors, str(found_root.global_root)
)
)
if not any(
is_subdirectory(child=local_root_path, parent=Path(path))
for path in nesting_ignored_all_errors_path
):
error_message = (
"Local configuration is nested under another local configuration at "
f"`{nesting_local_root}`.\nPlease add `{local_root_path}` to the "
"`ignore_all_errors` field of the parent, or combine the sources "
"into a single configuration, or split the parent configuration to "
"avoid inconsistent errors."
)
raise InvalidConfiguration(error_message)
current_directory = nesting_local_root.parent
def check_open_source_version(configuration: Configuration) -> None:
"""
Check if version specified in configuration matches running version and warn
if it does not.
"""
expected_version = configuration.version_hash
if expected_version is None or not re.match(r"\d+\.\d+\.\d+", expected_version):
return
try:
# pyre-ignore[21]: dynamic import
from pyre_check import __version__ as actual_version
if expected_version != actual_version:
LOG.warning(
textwrap.dedent(
f"""\
Your running version does not match the configured version for this
project (running {actual_version}, expected {expected_version})."""
)
)
except ImportError:
pass