tools/playground/application.py (198 lines of code) (raw):

# Copyright (c) Meta Platforms, Inc. and affiliates. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. # pyre-unsafe import argparse import json import logging import os import subprocess import tempfile import threading from pathlib import Path from typing import IO, List from flask import Flask, jsonify, request from flask_cors import CORS # pyre-fixme[21]: pyre cannot seem to find this module from flask_socketio import emit, SocketIO logging.basicConfig( format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", level=logging.DEBUG, ) LOG: logging.Logger = logging.getLogger(__name__) CUSTOM_PYSA_MODEL_FILE: str = "custom.pysa" WATCHMAN_CONFIG_FILE: str = ".watchmanconfig" PYRE_CONFIG_FILE: str = ".pyre_configuration" INPUT_FILE: str = "input.py" def _consume(stream: IO[str]) -> str: buffer: List[str] = [] def _consume() -> None: while True: line = stream.readline() if line: decoded = line.strip() LOG.debug(decoded) buffer.append(decoded) else: break thread = threading.Thread(target=_consume) thread.start() thread.join() return "\n".join(buffer) class Pyre: def __init__(self) -> None: self._directory: Path = Path(tempfile.mkdtemp()) LOG.debug(f"Starting server in `{self._directory}`...") pyre_configuration = json.dumps( { "source_directories": ["."], } ) LOG.debug(f"Writing configuration:\n{pyre_configuration}") pyre_configuration_path = self._directory / PYRE_CONFIG_FILE pyre_configuration_path.write_text(pyre_configuration) LOG.debug("Writing watchman configuration") watchman_configuration_path = self._directory / WATCHMAN_CONFIG_FILE watchman_configuration_path.write_text("{}\n") LOG.debug("Initializing the code") code_path = self._directory / INPUT_FILE code_path.write_text("x = 0\n") LOG.debug("Starting watchman") subprocess.check_call(["watchman", "watch", str(self._directory)]) LOG.debug("Priming the server") subprocess.check_call( ["pyre", "--noninteractive"], cwd=self._directory, ) def check(self, input: str) -> str: LOG.debug("Running pyre check") code_path = self._directory / INPUT_FILE code_path.write_text(input) with subprocess.Popen( ["pyre", "--output=json", "--noninteractive"], stderr=subprocess.PIPE, stdout=subprocess.PIPE, cwd=self._directory, text=True, ) as process: # pyre-fixme[6]: Expected `IO[bytes]` for 1st param but got # `Optional[IO[typing.Any]]`. stderr = _consume(process.stderr) # pyre-fixme[6]: Expected `IO[bytes]` for 1st param but got # `Optional[IO[typing.Any]]`. stdout = _consume(process.stdout) return_code = process.wait() if return_code > 1: LOG.error(f"Returning error: {stderr}") result = jsonify(errors=[stderr]) else: errors = json.loads(stdout) result = jsonify(data={"errors": errors, "stderr": stderr}) return result class Pysa: def __init__( self, input: str, model: str = "", use_builtin_pysa_models: bool = False ) -> None: self._directory: Path = Path(tempfile.mkdtemp()) self._stubs: Path = Path(tempfile.mkdtemp()) LOG.debug(f"Intializing Pysa in `{self._directory}`...") pyre_configuration = json.dumps( { "source_directories": ["."], "taint_models_path": [ str(self._stubs), os.environ["PYSA_PLAYGROUND_TAINT_MODELS"], ] if use_builtin_pysa_models else str(self._stubs), "search_path": [str(self._stubs), os.environ["PYSA_PLAYGROUND_STUBS"]], } ) LOG.debug(f"Writing configuration:\n{pyre_configuration}") pyre_configuration_path = self._directory / PYRE_CONFIG_FILE pyre_configuration_path.write_text(pyre_configuration) if model: LOG.debug("Writing custom model to pysa file") model_path = self._stubs / CUSTOM_PYSA_MODEL_FILE model_path.write_text(model) LOG.debug(f"Writing code:\n{input}") code_path = self._directory / INPUT_FILE code_path.write_text(input) def analyze(self) -> None: LOG.debug("Running pysa") with subprocess.Popen( ["pyre", "-n", "analyze"], stderr=subprocess.PIPE, stdout=subprocess.PIPE, cwd=self._directory, text=True, ) as process: model_verification_errors = [] # pyre-fixme[16]: process.stderr is marked as Optional for line in iter(process.stderr.readline, b""): line = line.rstrip() if line == "": break elif "ERROR" in line and "is not part of the environment" in line: model_verification_errors.append(line) elif "INFO" in line or "ERROR" in line: if model_verification_errors: # Emit all model verification lines together to prevent # network overhead. model_verification_error_output = "\n".join( model_verification_errors ) emit( "pysa_results_channel", { "type": "output", "line": model_verification_error_output, }, ) LOG.debug(model_verification_error_output) model_verification_errors = [] emit("pysa_results_channel", {"type": "output", "line": line}) LOG.debug(line) return_code = process.wait() if return_code != 0: result = {"type": "finished", "result": "error"} else: result = {"type": "finished", "result": "ok"} emit("pysa_results_channel", result) def get_server(): application = Flask(__name__) # You may need to modify the origin to the pyre-check website # before deployment. CORS(application) socketio = SocketIO(application, cors_allowed_origins="*") LOG.info("Initializizing the pyre server") pyre = Pyre() LOG.info("Pyre server is initialized, configuring application routes") @application.route("/check", methods=["GET", "POST"]) def check() -> str: input = ( request.args.get("input") or request.form.get("input") or request.json.get("input") ) if input is None: return jsonify(errors=["Input not provided"]) LOG.info(f"Checking `{input}`...") return pyre.check(input) @socketio.on("analyze", namespace="/analyze") def analyze(json) -> None: input = json.get("input", None) use_builtin_pysa_models = json.get("use_builtin_pysa_models", False) model = json.get("model", "") if input is None: emit( "pysa_results_channel", { "type": "finished", "result": "error", "reason": "No code given to analyze.", }, ) else: pysa = Pysa(input, model, use_builtin_pysa_models) LOG.info(f"Checking `{input}`...") pysa.analyze() @application.route("/") def index() -> str: return "404" return application, socketio def run_server(debug: bool) -> None: application, socketio = get_server() socketio.run(application, debug=debug) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--debug", action="store_true") arguments: argparse.Namespace = parser.parse_args() run_server(debug=arguments.debug)