tools/upgrade/commands/targets_to_configuration.py (324 lines of code) (raw):

# Copyright (c) Meta Platforms, Inc. and affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import argparse import logging import subprocess from pathlib import Path from typing import Dict, List, Optional import libcst from typing_extensions import Final from ..configuration import Configuration from ..errors import Errors from ..filesystem import ( LocalMode, Target, add_local_mode, find_directories, find_files, find_targets, get_filesystem, remove_non_pyre_ignores, ) from ..repository import Repository from .command import CommandArguments, ErrorSuppressingCommand from .strict_default import StrictDefault LOG: logging.Logger = logging.getLogger(__name__) class TargetPyreRemover(libcst.CSTTransformer): def leave_Call( self, original_node: libcst.Call, updated_node: libcst.Call ) -> libcst.Call: check_types = False uses_pyre = True updated_fields = [] for field in original_node.args: name = field.keyword value = field.value if not name: continue name = name.value if name == "check_types": if isinstance(value, libcst.Name): check_types = check_types or value.value.lower() == "true" elif name == "check_types_options": if isinstance(value, libcst.SimpleString): uses_pyre = uses_pyre and "mypy" not in value.value.lower() elif name not in ["typing", "typing_options"]: updated_fields.append(field) if check_types and uses_pyre: return updated_node.with_changes(args=updated_fields) return updated_node class TargetsToConfiguration(ErrorSuppressingCommand): def __init__( self, command_arguments: CommandArguments, *, repository: Repository, subdirectory: Optional[str], glob: int, fixme_threshold: int, pyre_only: bool, strict: bool, only_clean_targets: bool, ) -> None: super().__init__(command_arguments, repository) self._subdirectory: Final[Optional[str]] = subdirectory self._glob_threshold: Optional[int] = glob self._fixme_threshold: int = fixme_threshold self._pyre_only: bool = pyre_only self._strict: bool = strict self._only_clean_targets: bool = only_clean_targets @staticmethod def from_arguments( arguments: argparse.Namespace, repository: Repository ) -> "TargetsToConfiguration": command_arguments = CommandArguments.from_arguments(arguments) return TargetsToConfiguration( command_arguments, repository=repository, subdirectory=arguments.subdirectory, glob=arguments.glob, fixme_threshold=arguments.fixme_threshold, pyre_only=arguments.pyre_only, strict=arguments.strict, only_clean_targets=arguments.only_clean_targets, ) @classmethod # pyre-fixme[40]: Non-static method `add_arguments` cannot override a static # method defined in `ErrorSuppressingCommand`. def add_arguments(cls, parser: argparse.ArgumentParser) -> None: super(TargetsToConfiguration, cls).add_arguments(parser) parser.set_defaults(command=cls.from_arguments) parser.add_argument( "--subdirectory", help="Only upgrade TARGETS files within this directory." ) parser.add_argument( "--glob", type=int, help="Use a toplevel glob target instead of listing individual targets. \ Fall back to individual targets if errors per file ever hits given \ threshold.", ) parser.add_argument( "--fixme-threshold", type=int, help="Ignore all errors in a file if fixme count exceeds threshold.", ) parser.add_argument( "--strict", action="store_true", help="Turn on default strict mode if any targets were strict.", ) parser.add_argument( "--pyre-only", action="store_true", help="Only convert pyre targets to configuration.", ) parser.add_argument( "--only-clean-targets", action="store_true", help="Only perform target cleanup without affecting pyre configurations.", ) def remove_target_typing_fields(self, files: List[Path]) -> None: LOG.info("Removing typing options from %s targets files", len(files)) if self._pyre_only and not self._glob_threshold: for path in files: targets_file = Path(path) source = targets_file.read_text() output = libcst.parse_module(source).visit(TargetPyreRemover()).code targets_file.write_text(output) else: typing_options_regex = [ r"typing \?=.*", r"check_types \?=.*", r"check_types_options \?=.*", r"typing_options \?=.*", r"type_checker \?=.*", ] remove_typing_fields_command = [ "sed", "-i", "/" + r"\|".join(typing_options_regex) + "/d", ] + [str(file) for file in files] subprocess.run(remove_typing_fields_command) def find_or_create_configuration( self, directory: Path, new_targets: List[str] ) -> Configuration: configuration_path = directory / ".pyre_configuration.local" if configuration_path.exists(): LOG.warning( "Pyre project already exists at %s.\n\ Amending targets to existing configuration.", configuration_path, ) configuration = Configuration(configuration_path) configuration.add_targets(new_targets) configuration.deduplicate_targets() configuration.write() else: LOG.info("Creating local configuration at %s.", configuration_path) configuration_contents = {"targets": new_targets} configuration = Configuration(configuration_path, configuration_contents) configuration.write() # Add newly created configuration files to version control self._repository.add_paths([configuration_path]) return configuration def collect_full_targets(self, targets: Dict[str, List[Target]]) -> List[str]: new_targets = [] for path, targets in targets.items(): new_targets += [ "//" + path.replace("/TARGETS", "") + ":" + target.name for target in targets if target.check_types ] return new_targets def convert_directory(self, directory: Path) -> None: all_targets = find_targets(directory, pyre_only=self._pyre_only) if not all_targets: LOG.warning("No configuration created because no targets found.") return # Set strict default to true if any binary or unittest targets are strict. apply_strict = self._strict and any( target.strict for target in [ target for target_list in all_targets.values() for target in target_list ] ) # Collect targets. new_targets = self.collect_full_targets(all_targets) targets_files = [Path(path) for path in all_targets.keys()] configuration = self.find_or_create_configuration(directory, new_targets) # Try setting a glob target. glob_threshold = self._glob_threshold all_errors = None if glob_threshold is not None: original_targets = configuration.targets configuration.targets = ["//" + str(directory) + "/..."] configuration.write() all_errors = configuration.get_errors() if any( len(errors) > glob_threshold for errors in all_errors.paths_to_errors.values() ): # Fall back to non-glob codemod. LOG.info( "Exceeding error threshold of %d; falling back to listing " "individual targets.", glob_threshold, ) configuration.targets = original_targets configuration.write() all_errors = configuration.get_errors() else: targets_files = [ directory / path for path in get_filesystem().list( str(directory), patterns=[r"**/TARGETS"] ) ] if not all_errors: all_errors = configuration.get_errors() # Remove all type-related target settings. self.remove_target_typing_fields(targets_files) if not self._pyre_only: remove_non_pyre_ignores(directory) # Suppress errors in individual files where fixme threshold is not exceeded. error_threshold = self._fixme_threshold for path, errors in all_errors.paths_to_errors.items(): errors = list(errors) error_count = len(errors) if error_threshold and error_count > error_threshold: LOG.info( "%d errors found in `%s`. Adding file-level ignore.", error_count, path, ) add_local_mode(path, LocalMode.IGNORE) else: self._apply_suppressions(Errors(errors)) # Spin up strict codemod if applicable, otherwise skip to final clean and lint. if apply_strict: LOG.info( "Some targets were running strict type checking. " "Adding strict setting to configuration." ) strict_codemod = StrictDefault( command_arguments=CommandArguments( comment=self._comment, max_line_length=self._max_line_length, truncate=self._truncate, unsafe=self._unsafe, force_format_unsuppressed=self._force_format_unsuppressed, lint=self._lint, no_commit=True, should_clean=True, ), repository=self._repository, local_configuration=directory, remove_strict_headers=True, fixme_threshold=0, ) strict_codemod.run() else: self._get_and_suppress_errors(configuration) def _gather_directories(self, subdirectory: Path) -> List[Path]: configurations = find_files(subdirectory, ".pyre_configuration.local") configuration_directories = [ configuration.replace("/.pyre_configuration.local", "") for configuration in configurations ] sorted_directories = sorted( (directory.split("/") for directory in configuration_directories), key=lambda directory: (len(directory), directory), ) if len(configuration_directories) == 0: configuration_directories = [str(subdirectory)] else: # Fill in missing coverage missing_directories = [] current_depth = len(str(subdirectory).split("/")) for directory in sorted_directories: if len(directory) <= current_depth: continue all_subdirectories = find_directories( Path("/".join(directory[0:current_depth])) ) for subdirectory in all_subdirectories: if all( not configuration_directory.startswith(str(subdirectory)) for configuration_directory in configuration_directories ): missing_directories.append(subdirectory) current_depth += 1 configuration_directories.extend(missing_directories) return [Path(directory) for directory in configuration_directories] def run(self) -> None: # TODO(T62926437): Basic integration testing. subdirectory = self._subdirectory subdirectory = Path(subdirectory) if subdirectory else Path.cwd() if self._only_clean_targets: LOG.info( "Cleaning typecheck targets from `%s`.", subdirectory, ) LOG.info("No pyre configurations will be affected.") all_targets = find_targets(subdirectory, pyre_only=self._pyre_only) if not all_targets: LOG.warning("No targets found.") return targets_files = [Path(path) for path in all_targets.keys()] self.remove_target_typing_fields(targets_files) return LOG.info( "Converting typecheck targets to pyre configurations in `%s`.", subdirectory, ) configuration_directories = self._gather_directories(subdirectory) converted = [] for directory in configuration_directories: if all( str(directory).startswith(str(converted_directory)) is False for converted_directory in converted ): self.convert_directory(directory) converted.append(directory) summary = self._repository.MIGRATION_SUMMARY glob = self._glob_threshold if glob: summary += ( f"\n\nConfiguration target automatically expanded to include " f"all subtargets, expanding type coverage while introducing " f"no more than {glob} fixmes per file." ) title = f"Convert type check targets in {subdirectory} to use configuration" self._repository.commit_changes( commit=(not self._no_commit), title=title, summary=summary, set_dependencies=False, )