utils/preflight_check.py (323 lines of code) (raw):

#!/usr/bin/env python3 """ Perform pre-flight checks for a training run. This script will exercise the taskgraph and output information about the training steps, and how they are organized. This is helpful when debugging and understanding pipeline changes, or when writing a training config. Usage: task preflight-check # Open the graph task preflight-check -- --open_graph # Only show one section, like the task_group. task preflight-check -- --only task_group """ import argparse import http.server import json import os import socket import sys import urllib import webbrowser from enum import Enum from textwrap import dedent from typing import Any, Callable, Optional, Union import requests import taskgraph.actions import taskgraph.parameters from blessed import Terminal from taskgraph.config import load_graph_config from taskgraph.util import yaml current_folder = os.path.dirname(os.path.abspath(__file__)) artifacts_folder = os.path.join(current_folder, "../artifacts") term = Terminal() # The parameters are a read only dict. The class is not exported, so this is a close # approximation of the type. Parameters = dict[str, Any] # The type for the dependency injection of webbrowser.open. OpenInBrowser = Callable[[str], None] def load_yml(filename: str) -> Any: with open(filename) as f: return yaml.load_stream(f) def get_taskgraph_parameters() -> Parameters: # These are required by taskgraph. os.environ["TASK_ID"] = "fake_id" os.environ["RUN_ID"] = "0" os.environ["TASKCLUSTER_ROOT_URL"] = "https://firefox-ci-tc.services.mozilla.com" # Load taskcluster/config.yml graph_config = load_graph_config("taskcluster") # Add the project's taskgraph directory to the python path, and register # any extensions present. graph_config.register() parameters = taskgraph.parameters.load_parameters_file(None, strict=False) parameters.check() # Example parameters: # { # 'base_ref': '', # 'base_repository': 'git@github.com:mozilla/translations.git', # 'base_rev': '', # 'build_date': 1704894563, # 'build_number': 1, # 'do_not_optimize': [], # 'enable_always_target': True, # 'existing_tasks': {}, # 'filters': ['target_tasks_method'], # 'head_ref': 'main', # 'head_repository': 'git@github.com:mozilla/translations.git', # 'head_rev': 'e48440fc2c52da770d0f652a32583eae3450766f', # 'head_tag': '', # 'level': '3', # 'moz_build_date': '20240110074923', # 'next_version': None, # 'optimize_strategies': None, # 'optimize_target_tasks': True, # 'owner': 'nobody@mozilla.com', # 'project': 'translations', # 'pushdate': 1704894563, # 'pushlog_id': '0', # 'repository_type': 'git', # 'target_tasks_method': 'default', # 'tasks_for': '', # 'training_config': { ... }, # 'version': None # } return parameters _last_config_path = None def get_training_config(cfg_path: str): cfg_path = os.path.realpath(cfg_path) global _last_config_path # noqa: PLW0602 if _last_config_path: if cfg_path != _last_config_path: raise Exception( "Changing the config paths and re-running run_taskgraph is not supported." ) # Don't regenerate the taskgraph for tests, as this can be slow. It's likely that # tests will exercise this codepath. return return load_yml(cfg_path) def run_taskgraph(cfg_path: str, parameters: Parameters) -> None: # The callback can be a few standard things like "cancel" and "rerun". Custom actions # can be created in taskcluster/translations_taskgraph/actions/ such as the train action. callback = "train" input = get_training_config(cfg_path) if not input: # This is probably a test run. return # This command outputs the stdout. Ignore it here. stdout = sys.stdout devnull = open(os.devnull, "w") sys.stdout = devnull # This invokes train_action in taskcluster/translations_taskgraph/actions/train.py taskgraph.actions.trigger_action_callback( task_group_id=None, task_id=None, input=input, callback=callback, parameters=parameters, root="taskcluster", test=True, ) sys.stdout = stdout def pretty_print_training_config(cfg_path: str) -> None: text = dedent( f""" {term.yellow_underline("Training config (JSON)")} The training config in YAML gets converted to JSON, and can sometimes be interpreted incorrectly. Verify the config here. """ ) print(text) training_config = get_training_config(cfg_path) print(term.gray(json.dumps(training_config, indent=2))) def pretty_print_artifacts_dir() -> None: docs = { "actions.json": """ Contains the valid actions, such as: "retrigger", "add-new-jobs", "rerun", "cancel", "cancel-all", "rebuild-cached-tasks", "retrigger-multiple) """, "full-task-graph.json": """ The full list of every task, such as. { "alignments-en-ru": { ... }, "all-pipeline-en-ru-1": { ... }, "bicleaner-opus-Books_v1-en-ru": { ... }, ... } """, "label-to-taskid.json": """ For example: { "alignments-en-ru": "CJwUWGPIQ4CdeinnE_NtNQ", "all-pipeline-en-ru-1": "DjZKX9lkTvmQDbnmFtNOBg", "bicleaner-opus-Books_v1-en-ru": "fukq_91DQ4S9verrlipC1A", ... } """, "parameters.yml": """ Given the .yml config, this is what is applied to generate the tasks. """, "runnable-jobs.json": """ I'm not sure, but it appears to be build-* and fetch-* jobs? Maybe it's the first round of tasks that can be run? """, "target-tasks.json": """ The dummy target for training, such as ["all-pipeline-en-ru-1"] """, "task-graph.json": """ The full task graph DAG. e.g. { "A6tejCuJSBmOeykhkUVZgw": { ... "dependencies": { ... }, "task": { ... } ... } ... } """, } ignore = ["fetch-content", "run-task"] text = dedent( f""" {term.yellow_underline("Artifacts")} The pre-flight check outputs the full description of the tasks to the artifacts directory. This can be useful for verifying configuration of the run. {term.gray(" .")}""" ) print(text) files = os.listdir(artifacts_folder) for i, file in enumerate(files): if file in ignore: continue doc_icon = " " if i + 1 == len(files) else "│" file_icon = "└──" if i + 1 == len(files) else "├──" doc_lines = dedent(docs.get(file, "")).split("\n") file_entry = term.underline(f"artifacts/{file}") print(term.white(f" {file_icon} {file_entry}")) for doc_line in doc_lines: print(f" {term.white(doc_icon)} {term.gray(doc_line)}") def get_free_port() -> int: # https://stackoverflow.com/questions/1365265/on-localhost-how-do-i-pick-a-free-port-number sock = socket.socket() sock.bind(("", 0)) return sock.getsockname()[1] waiting_for_request = True def pretty_print_cmd(command_union: Optional[Union[list[str], list[list[str]]]]): """Pretty print the cmd. It could be a nested array.""" if not command_union: return # Check for nested commands. if isinstance(command_union[0], list): for subcommand in command_union: assert isinstance(subcommand, list) # Recurse into the subcommand pretty_print_cmd(subcommand) return command: list[str] = command_union # type: ignore # This command is not useful to display. if " ".join(command) == "chmod +x run-task": return # Many commands are invoked by bash. Hide that in the output. # Example: # ['/usr/local/bin/run-task', '--translations-checkout=/builds/worker/checkouts/vcs/', # '--task-cwd', '/builds/worker/checkouts/vcs', '--', 'bash', '-cx', 'make validate-taskgraph'] try: index = command.index("bash") print(command) command = command[index + 2 :] except ValueError: pass try: index = command.index("/builds/worker/checkouts") command = command[index + 2 :] except ValueError: pass # This is a bit of a hacky way to not create a newline for `--`. delimiter = "#-#-#-#-#-#-#-#-#-#-#-#" if delimiter in command: raise Exception("Delimiter found in command, change the delimiter") subcommands = [] for subcommand in " ".join(command).split("&&"): subcommands.append( ( subcommand # No newlines for `command.sh -- extra_args`. .replace("-- ", delimiter) # Remove whitespace. .strip() # Create newlines for flags. .replace("--", "\\\n --") # Put the `--` back. .replace(delimiter, "-- ") ) ) command_text = "\n ".join(subcommands) print(f" {term.gray(command_text)}") task_graph = None def load_taskgraph() -> dict[str, dict]: global task_graph if not task_graph: with open(os.path.join(artifacts_folder, "full-task-graph.json"), "rb") as file: task_graph = json.load(file) return task_graph def pretty_print_task_graph() -> None: text = dedent( f""" {term.yellow_underline("Task Commands")} The following is a full list of the tasks and their commands to run them. """ ) print(text) for key, value in load_taskgraph().items(): print(f"{term.cyan_bold_underline(key)}") print(f" {term.white_bold(value['task']['metadata']['description'])}") pretty_print_cmd(value["task"]["payload"].get("command")) def serve_taskgraph_file( graph_url, open_graph: bool, persist_graph: bool, open_in_browser: OpenInBrowser ) -> None: """ Serves the taskgraph file so that it can be opened in the taskcluster tools graph. """ if not open_graph: text = dedent( f""" {term.yellow_underline("Visualization")} To open a visualization of the task add --open_graph to the arguments, or drag the file {term.white_underline("artifacts/task-graph.json")} into: {term.white_underline("https://gregtatum.github.io/taskcluster-tools/")} """ ) print(text) return port = get_free_port() json_url = f"http://localhost:{port}" graph_url_final = f"{graph_url}/?taskGraph={urllib.parse.quote(json_url)}" # type: ignore open_in_browser(graph_url_final) server = http.server.HTTPServer(("", port), ServeArtifactFile) if persist_graph: print("Serving the graph:", term.underline(graph_url_final)) print("Hit Ctrl-C to exit") while (waiting_for_request or persist_graph) and open_in_browser is webbrowser.open: server.handle_request() text = dedent( f""" {term.yellow_underline("Visualization")} The taskgraph structure was opened in TaskCluster tools. This represents a graph of the relationships of the tasks that will be used in training. """ ) print(text) class ServeArtifactFile(http.server.BaseHTTPRequestHandler): """Creates a one-time server that just serves one file.""" def _set_headers(self): self.send_response(200) self.send_header("Content-type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() def log_message(self, *args): # type: ignore # Disable server logging. pass def do_HEAD(self): self._set_headers() def do_GET(self): self._set_headers() task_graph_path = os.path.join(artifacts_folder, "task-graph.json") try: with open(task_graph_path, "rb") as file: self.wfile.write(file.read()) except Exception as exception: print("Failed to serve the file", exception) pass global waiting_for_request waiting_for_request = False Choices = Enum( "Choices", [ "task_group", "artifacts", "training_config", "graph", "url_mounts", ], ) def is_url_ok(url) -> bool: try: response = requests.head(url) return response.ok except Exception: return False def check_url_mounts(): text = dedent( f""" {term.yellow_underline("URL Mounts")} Check that mounted URLs, such as pretrained models are valid. """ ) print(text) has_bad_url = False has_mounts = False for task_name, task in load_taskgraph().items(): mounts = task.get("task", {}).get("payload", {}).get("mounts", []) # Only keep mounts that are external URLs. mounts = [ mount for mount in mounts if ( # This could be a cache mount. "content" in mount # This is an internal URL. and not mount["content"]["url"].startswith( "https://firefox-ci-tc.services.mozilla.com" ) ) ] if len(mounts) == 0: continue has_mounts = True print(term.cyan_bold_underline(f'Mounts for "{task_name}"')) for mount in mounts: if "content" not in mount: continue url: str = mount["content"]["url"] if url.startswith("https://firefox-ci-tc.services.mozilla.com"): # This is an internal URL. continue if is_url_ok(url): print(term.green("✓"), term.gray(url)) else: print(term.red(f"❌ {url}")) has_bad_url = True if not has_mounts: print(term.gray("No mounts presents")) if has_bad_url: sys.exit(1) def main( args: Optional[list[str]] = None, open_in_browser: OpenInBrowser = webbrowser.open, # type: ignore ) -> None: parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawTextHelpFormatter, # Preserves whitespace in the help text. ) parser.add_argument( "--config", default="taskcluster/configs/config.prod.yml", type=str, help='The path to the training config. Defaults to "taskcluster/configs/config.prod.yml"', ) parser.add_argument( "--only", default=None, type=str, choices=[c.name for c in Choices], help="Only output one section", ) parser.add_argument( "--open_graph", action="store_true", help="Open the graph visualization in a browser" ) parser.add_argument( "--persist_graph", action="store_true", help="Keep serving the graph indefinitely" ) parser.add_argument( "--graph_url", default="https://gregtatum.github.io/taskcluster-tools", help="Override the graph URL (for local testing)", ) parsed_args = parser.parse_args(args) # Build the artifacts folder. run_taskgraph(parsed_args.config, get_taskgraph_parameters()) choice = Choices[parsed_args.only] if parsed_args.only else None if choice == Choices.task_group: pretty_print_task_graph() elif choice == Choices.artifacts: pretty_print_artifacts_dir() elif choice == Choices.training_config: pretty_print_training_config(parsed_args.config) elif choice == Choices.graph: serve_taskgraph_file( parsed_args.graph_url, parsed_args.open_graph, parsed_args.persist_graph, open_in_browser, ) elif choice == Choices.url_mounts: check_url_mounts() elif choice is None: pretty_print_task_graph() pretty_print_artifacts_dir() pretty_print_training_config(parsed_args.config) serve_taskgraph_file( parsed_args.graph_url, parsed_args.open_graph, parsed_args.persist_graph, open_in_browser, ) check_url_mounts() if __name__ == "__main__": main()