tools/generate_taint_models/__init__.py (171 lines of code) (raw):

# Copyright (c) Meta Platforms, Inc. and affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. # pyre-strict import argparse import json import logging import os import sys import time from dataclasses import dataclass from itertools import chain from typing import Dict, List, Optional, Set, Mapping, AbstractSet from typing_extensions import Final from ...api.connection import PyreConnection from ...client import statistics_logger from .annotated_function_generator import ( # noqa AnnotatedFunctionGenerator, FunctionVisitor, FunctionDefinition, ) from .generator_specifications import DecoratorAnnotationSpecification # noqa from .get_annotated_free_functions_with_decorator import ( # noqa AnnotatedFreeFunctionWithDecoratorGenerator, ) from .get_class_sources import ClassSourceGenerator # noqa from .get_constructor_initialized_attribute_sources import ( # noqa ConstructorInitializedAttributeSourceGenerator, ) from .get_django_class_based_view_models import DjangoClassBasedViewModels # noqa from .get_dynamic_graphql_sources import DynamicGraphQLSourceGenerator # noqa from .get_exit_nodes import ExitNodeGenerator # noqa from .get_filtered_sources import FilteredSourceGenerator # noqa from .get_globals import GlobalModelGenerator # noqa from .get_graphene_models import GrapheneModelsGenerator # noqa from .get_graphql_sources import GraphQLSourceGenerator # noqa from .get_methods_of_subclasses import MethodsOfSubclassesGenerator # noqa from .get_models_filtered_by_callable import ModelsFilteredByCallableGenerator # noqa from .get_request_specific_data import RequestSpecificDataGenerator # noqa from .get_REST_api_sources import RESTApiSourceGenerator # noqa from .get_undecorated_sources import UndecoratedSourceGenerator # noqa from .model import Model from .model_generator import ModelGenerator LOG: logging.Logger = logging.getLogger(__name__) @dataclass class GenerationArguments: """ When adding new generation options, make sure to add a default value for them for backwards compatibility. We construct GenerationArguments objects outside the current directory, and adding a non-optional argument will break those. """ mode: Final[Optional[List[str]]] verbose: bool output_directory: Final[Optional[str]] # Pyre arguments. no_saved_state: bool = False isolation_prefix: Optional[str] = None stop_pyre_server: bool = False def _file_exists(path: str) -> str: if not os.path.exists(path): raise ValueError("No file at `{path}`") return path def _parse_arguments( generator_options: Dict[str, ModelGenerator[Model]] ) -> GenerationArguments: parser = argparse.ArgumentParser() parser.add_argument("-v", "--verbose", action="store_true", help="Verbose logging") parser.add_argument("--mode", action="append", choices=generator_options.keys()) parser.add_argument( "--no-saved-state", action="store_true", help="Disable pyre saved state" ) parser.add_argument( "--isolation-prefix", type=str, help="Buck isolation prefix when running pyre" ) parser.add_argument( "--stop-pyre-server", action="store_true", help="Stop the pyre server once done" ) parser.add_argument( "--output-directory", type=_file_exists, help="Directory to write models to" ) arguments: argparse.Namespace = parser.parse_args() return GenerationArguments( mode=arguments.mode, verbose=arguments.verbose, output_directory=arguments.output_directory, no_saved_state=arguments.no_saved_state, isolation_prefix=arguments.isolation_prefix, stop_pyre_server=arguments.stop_pyre_server, ) def _report_results( models: Mapping[str, AbstractSet[Model]], output_directory: Optional[str] ) -> None: if output_directory is not None: for name in models: # Try to be slightly intelligent in how we name files. if name.startswith("get_"): filename = f"generated_{name[4:]}" else: filename = f"generated_{name}" with open(f"{output_directory}/{filename}.pysa", "w") as output_file: output_file.write( "\n".join([str(model) for model in sorted(models[name])]) ) output_file.write("\n") print( json.dumps( { "number of generated models": sum( (len(generated_models) for generated_models in models.values()) ) } ) ) else: all_models = chain.from_iterable(models.values()) print("\n".join([str(model) for model in sorted(all_models)])) def run_from_parsed_arguments( generator_options: Dict[str, ModelGenerator[Model]], arguments: GenerationArguments, default_modes: List[str], logger_executable: Optional[str] = None, include_default_modes: bool = False, pyre_connection: Optional[PyreConnection] = None, ) -> None: argument_modes = arguments.mode or [] if len(argument_modes) == 0 or include_default_modes: modes = list(set(argument_modes + default_modes)) else: modes = argument_modes if pyre_connection is not None and arguments.no_saved_state: pyre_connection.add_arguments("--no-saved-state") isolation_prefix = arguments.isolation_prefix if pyre_connection is not None and isolation_prefix is not None: pyre_connection.add_arguments("--isolation-prefix", isolation_prefix) generated_models: Dict[str, Set[Model]] = {} for mode in modes: LOG.info("Computing models for `%s`", mode) if mode not in generator_options.keys(): LOG.warning(f"Unknown mode `{mode}`, skipping.") continue start = time.time() generated_models[mode] = set(generator_options[mode].generate_models()) elapsed_time_seconds = time.time() - start LOG.info(f"Computed models for `{mode}` in {elapsed_time_seconds:.3f} seconds.") if logger_executable is not None: elapsed_time_milliseconds = int(elapsed_time_seconds * 1000) statistics_logger.log( statistics_logger.LoggerCategory.PERFORMANCE, integers={"time": elapsed_time_milliseconds}, normals={ "name": "model generation", "model kind": mode, "command_line": " ".join(sys.argv), }, logger=logger_executable, ) _report_results(generated_models, arguments.output_directory) if pyre_connection is not None and arguments.stop_pyre_server: LOG.info("Stopping the pyre server.") pyre_connection.stop_server(ignore_errors=True) def run_generators( generator_options: Dict[str, ModelGenerator[Model]], default_modes: List[str], verbose: bool = False, logger_executable: Optional[str] = None, include_default_modes: bool = False, pyre_connection: Optional[PyreConnection] = None, ) -> None: arguments = _parse_arguments(generator_options) logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.DEBUG if verbose or arguments.verbose else logging.INFO, ) run_from_parsed_arguments( generator_options, arguments, default_modes, logger_executable, include_default_modes=include_default_modes, pyre_connection=pyre_connection, )