client/filesystem.py (184 lines of code) (raw):
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import errno
import fcntl
import logging
import os
import shutil
import subprocess
from contextlib import contextmanager
from pathlib import Path
from typing import ContextManager, Dict, Generator, Iterable, List, Optional, Set
from .exceptions import EnvironmentException
LOG: logging.Logger = logging.getLogger(__name__)
def assert_readable_directory(directory: str, error_message_prefix: str = "") -> None:
if not os.path.isdir(directory):
raise EnvironmentException(
f"{error_message_prefix}`{directory}` is not a valid directory."
)
if not os.access(directory, os.R_OK):
raise EnvironmentException(
f"{error_message_prefix}`{directory}` is not a readable directory."
)
def readable_directory(directory: str) -> str:
assert_readable_directory(directory)
return directory
def assert_writable_directory(directory: str) -> None:
if not os.path.isdir(directory):
raise EnvironmentException("{} is not a valid directory.".format(directory))
if not os.access(directory, os.W_OK):
raise EnvironmentException("{} is not a writable directory.".format(directory))
def writable_directory(path: str) -> str:
# Create the directory if it does not exist.
try:
os.makedirs(path)
except FileExistsError:
pass
path = os.path.abspath(path)
assert_writable_directory(path)
return path
def translate_path(root: str, path: str) -> str:
if os.path.isabs(path):
return path
translated = os.path.join(root, path)
if os.path.exists(translated):
return os.path.realpath(translated)
return path
def expand_relative_path(root: str, path: str) -> str:
expanded_path = Path(path).expanduser()
if expanded_path.is_absolute():
return str(expanded_path)
else:
return str(Path(root) / expanded_path)
def translate_paths(paths: Set[str], original_directory: str) -> Set[str]:
current_directory = os.getcwd()
if not original_directory.startswith(current_directory):
return paths
translation = os.path.relpath(original_directory, current_directory)
if not translation:
return paths
return {translate_path(translation, path) for path in paths}
def exists(path: str) -> str:
if not os.path.isfile(path):
raise ValueError(f"{path} is not a valid file")
return path
def file_or_directory_exists(path: str) -> str:
if os.path.isdir(path) or os.path.isfile(path):
return path
raise ValueError(f"{path} is not a valid path")
def is_parent(parent: str, child: str) -> bool:
return child.startswith(parent.rstrip(os.sep) + os.sep)
def find_paths_with_extensions(root: str, extensions: Iterable[str]) -> List[str]:
root = os.path.abspath(root) # Return absolute paths.
extension_filter = []
for extension in extensions:
if len(extension_filter) > 0:
extension_filter.append("-or")
extension_filter.extend(["-name", "*.{}".format(extension)])
output = (
subprocess.check_output(
[
"find",
root,
# All files ending with the given extensions ...
"(",
*extension_filter,
")",
# ... and that are either regular files ...
"(",
"-type",
"f",
"-or",
# ... or symlinks.
"-type",
"l",
")",
# Print all such files.
"-print",
],
stderr=subprocess.DEVNULL,
)
.decode("utf-8")
.strip()
)
return output.split("\n") if output else []
def find_python_paths(root: str) -> List[str]:
try:
return find_paths_with_extensions(root, ["py", "pyi"])
except subprocess.CalledProcessError:
raise EnvironmentException(
"Pyre was unable to locate an analysis directory. "
+ "Ensure that your project is built and re-run pyre."
)
def is_empty(path: str) -> bool:
try:
return os.stat(path).st_size == 0
except FileNotFoundError:
return False
def remove_if_exists(path: str) -> None:
try:
os.remove(path)
except OSError:
pass # Not a file.
try:
shutil.rmtree(path)
except OSError:
pass # Not a directory.
def _compute_symbolic_link_mapping(
directory: str, extensions: Iterable[str]
) -> Dict[str, str]:
"""
Given a shared analysis directory, produce a mapping from actual source files
to files contained within this directory. Only includes files which have
one of the provided extensions.
Watchman watches actual source files, so when a change is detected to a
file, this mapping can be used to identify what file changed from Pyre's
perspective.
"""
symbolic_links = {}
try:
for symbolic_link in find_paths_with_extensions(directory, extensions):
symbolic_links[os.path.realpath(symbolic_link)] = symbolic_link
except subprocess.CalledProcessError as error:
LOG.warning(
"Exception encountered trying to find source files "
+ "in the analysis directory: `%s`",
error,
)
LOG.warning("Starting with an empty set of tracked files.")
return symbolic_links
def _delete_symbolic_link(link_path: str) -> None:
os.unlink(link_path)
def add_symbolic_link(link_path: str, actual_path: str) -> None:
directory = os.path.dirname(link_path)
try:
os.makedirs(directory)
except OSError:
pass
try:
os.symlink(actual_path, link_path)
except OSError as error:
if error.errno == errno.EEXIST:
os.unlink(link_path)
os.symlink(actual_path, link_path)
else:
LOG.error(str(error))
def _lock_command(blocking: bool, is_shared_reader: bool) -> int:
lock_command = fcntl.LOCK_SH if is_shared_reader else fcntl.LOCK_EX
return lock_command if blocking else lock_command | fcntl.LOCK_NB
@contextmanager
def acquire_lock(
path: str, blocking: bool, is_shared_reader: bool = False
) -> Generator[Optional[int], None, None]:
"""Raise an OSError if `blocking` is False and the lock can't be acquired.
If `is_shared_reader=True`, then other processes can acquire the same
lock with `is_shared_reader=True`, but not with `is_shared_reader=False`.
Conversely, if `is_shared_reader=False`, then no other process can
acquire the lock until it is released."""
LOG.debug(
"Trying to acquire %slock on file %s",
"shared reader " if is_shared_reader else "",
path,
)
try:
with open(path, "w+") as lockfile:
try:
fcntl.lockf(
lockfile.fileno(), _lock_command(blocking, is_shared_reader)
)
yield lockfile.fileno()
finally:
fcntl.lockf(lockfile.fileno(), fcntl.LOCK_UN)
except FileNotFoundError:
LOG.debug(f"Unable to acquire lock because lock file {path} was not found")
yield
@contextmanager
def do_nothing() -> Generator[None, None, None]:
yield
def acquire_lock_if_needed(
lock_path: str, blocking: bool, needed: bool
) -> ContextManager[Optional[int]]:
if needed:
return acquire_lock(lock_path, blocking)
else:
return do_nothing()