utils/ryzenai/notification_service.py (504 lines of code) (raw):

# Copyright 2020 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import math import os import re import sys import time import traceback from typing import Dict import requests from slack_sdk import WebClient sys.path.append(os.path.join(os.getcwd())) import tests.ryzenai.testing_utils as tu # noqa client = WebClient(token=os.environ["CI_SLACK_BOT_TOKEN"]) def infer_model_id(model): model_name_replacement = model.replace(".", "_").replace("-", "_") if "timm" in model: all_model_names = list(tu.PYTORCH_TIMM_MODEL["default-timm-config"].keys()) elif "amd" in model: all_model_names = ( tu.RYZEN_PREQUANTIZED_MODEL_IMAGE_CLASSIFICATION + list(tu.RYZEN_PREQUANTIZED_MODEL_OBJECT_DETECTION.values()) + tu.RYZEN_PREQUANTIZED_MODEL_IMAGE_SEGMENTATION + tu.RYZEN_PREQUANTIZED_MODEL_IMAGE_TO_IMAGE + tu.RYZEN_PREQUANTIZED_MODEL_CUSTOM_TASKS ) else: return model for model_name in all_model_names: if model_name.replace(".", "_").replace("-", "_") == model_name_replacement: return model_name return model def get_jobs(workflow_run_id, token=None): """Extract jobs in a GitHub Actions workflow run""" headers = None if token is not None: headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {token}"} url = f"https://api.github.com/repos/huggingface/optimum-amd/actions/runs/{workflow_run_id}/jobs?per_page=100" result = requests.get(url, headers=headers).json() jobs = [] try: jobs.extend(result["jobs"]) pages_to_iterate_over = math.ceil((result["total_count"] - 100) / 100) for i in range(pages_to_iterate_over): result = requests.get(url + f"&page={i + 2}", headers=headers).json() jobs.extend(result["jobs"]) return jobs except Exception: print(f"Unknown error, could not fetch links:\n{traceback.format_exc()}") return [] def handle_test_results(test_results): expressions = test_results.split(" ") failed = 0 success = 0 # When the output is short enough, the output is surrounded by = signs: "== OUTPUT ==" # When it is too long, those signs are not present. time_spent = expressions[-2] if "=" in expressions[-1] else expressions[-1] for i, expression in enumerate(expressions): if "failed" in expression: failed += int(expressions[i - 1]) if "passed" in expression: success += int(expressions[i - 1]) return failed, success, time_spent def handle_stacktraces(test_results): # These files should follow the following architecture: # === FAILURES === # <path>:<line>: Error ... # <path>:<line>: Error ... # <empty line> total_stacktraces = test_results.split("\n")[1:-1] stacktraces = [] for stacktrace in total_stacktraces: try: line = stacktrace[: stacktrace.index(" ")].split(":")[-2] error_message = stacktrace[stacktrace.index(" ") :] stacktraces.append(f"(line {line}) {error_message}") except Exception: stacktraces.append("Cannot retrieve error message.") return stacktraces class Message: def __init__( self, title: str, ci_title: str, results: Dict, ): self.title = title self.ci_title = ci_title # Failures and success of the modeling tests self.n_success = sum(r["success"] for r in results.values()) self.n_failures = sum(r["failed"] for r in results.values()) self.n_tests = self.n_failures + self.n_success self.model_results = results self.thread_ts = None @property def time(self) -> str: all_results = [*self.model_results.values()] time_spent = [r["time_spent"].split(", ")[0] for r in all_results if len(r["time_spent"])] total_secs = 0 for t in time_spent: time_parts = t.split(":") # Time can be formatted as xx:xx:xx, as .xx, or as x.xx if the time spent was less than a minute. if len(time_parts) == 1: time_parts = [0, 0, time_parts[0]] hours, minutes, seconds = int(time_parts[0]), int(time_parts[1]), float(time_parts[2]) total_secs += hours * 3600 + minutes * 60 + seconds hours, minutes, seconds = total_secs // 3600, (total_secs % 3600) // 60, total_secs % 60 return f"{int(hours)}h{int(minutes)}m{int(seconds)}s" @property def header(self) -> Dict: return {"type": "header", "text": {"type": "plain_text", "text": self.title}} @property def ci_title_section(self) -> Dict: return {"type": "section", "text": {"type": "mrkdwn", "text": self.ci_title}} @property def no_failures(self) -> Dict: return { "type": "section", "text": { "type": "plain_text", "text": f"🌞 There were no failures: all {self.n_tests} tests passed.\nThe suite ran in {self.time}.", "emoji": True, }, "accessory": { "type": "button", "text": {"type": "plain_text", "text": "Check Action results", "emoji": True}, "url": f"https://github.com/huggingface/optimum-amd/actions/runs/{os.environ['GITHUB_RUN_ID']}", }, } @property def failures(self) -> Dict: return { "type": "section", "text": { "type": "plain_text", "text": ( f"There were {self.n_failures} failures, out of {self.n_tests} tests.\n" f"The suite ran in {self.time}." ), "emoji": True, }, "accessory": { "type": "button", "text": {"type": "plain_text", "text": "Check Action results", "emoji": True}, "url": f"https://github.com/huggingface/optimum-amd/actions/runs/{os.environ['GITHUB_RUN_ID']}", }, } @property def category_failures(self) -> Dict: category_failures = [] for key in self.model_results: report = self.model_results[key] report = f"{str(report['failed']).rjust(6)} | {str(report['success']).rjust(7)} | {key}" category_failures.append((f"{report}")) header = "Failed | Success | Category \n" category_failures_report = prepare_reports(title="*Summary*", header=header, reports=category_failures) return {"type": "section", "text": {"type": "mrkdwn", "text": category_failures_report}} @property def model_failures(self): # Load baseline data from a JSON file with open(tu.BASELINE_JSON, "r") as json_file: baseline_data = json.load(json_file) model_failure_sections = [] text = ( "The following section presents category-wise failures for models, illustrating " "the total number of DPU and CPU operators associated with each failure. If a failure is " "attributed to regression (indicated as 'Reg.' in the table), baseline values are provided " "in parentheses. For other failures, operator values are not displayed. Please refer to the " "post reply for additional details on these failures." ) model_failure_sections.append( {"type": "header", "text": {"type": "plain_text", "text": "Category wise failures", "emoji": True}}, ) content = {"type": "section", "text": {"type": "plain_text", "text": text, "emoji": True}} model_failure_sections.append(content) for i, (key, result) in enumerate(self.model_results.items()): failures_info = [] for failure in result["failures"]: # Extract information from failure details line = failure["line"] trace = failure["trace"] # Identify model_id based on the failure line model_id = self.extract_model_id(line) model_id = infer_model_id(model_id) # Get baseline values for the identified model_id baseline_ops = baseline_data.get(model_id.lower().replace("/", "_"), {}) # Extract baseline values cpu_baseline_value = baseline_ops.get("cpu", 0) dpu_baseline_value = baseline_ops.get("dpu", 0) all_baseline_value = baseline_ops.get("all", 0) # Extract and compare values from the failure trace all_value_str, dpu_value_str, cpu_value_str, regressed = self.extract_operator_values( trace, all_baseline_value, dpu_baseline_value, cpu_baseline_value ) # Append information about the failure failures_info.append( f"{all_value_str.rjust(9)} | {dpu_value_str.rjust(9)} | {cpu_value_str.rjust(9)} | {regressed.rjust(4)} | {model_id[:40]}" ) if len(failures_info): # Prepare model failure sections model_failure_sections.extend( self.prepare_model_failure_sections(i + 1, key, result["job_link"], failures_info) ) return model_failure_sections def extract_model_id(self, line): # Extract model_id based on the line content if "amd" in line: match = re.search(r"::test_model_\d+_amd_([a-zA-Z0-9_-]+)", line) if match: return "amd/" + match.group(1) elif "timm" in line: match = re.search(r"default_timm_config_image_classification_timm_(\w+)", line) if match: return "timm/" + match.group(1) raise ValueError("Model id could not be determined!") def extract_operator_values(self, trace, all_baseline_value, dpu_baseline_value, cpu_baseline_value): # Extract values from trace and compare with baseline if "DPU operators do not match!" in trace or "Total operators do not match!" in trace: match = re.search(r"\{'all': (\d+), 'dpu': (\d+), 'cpu': (\d+)\}", trace) all_value = int(match.group(1)) dpu_value = int(match.group(2)) cpu_value = int(match.group(3)) # Process values and compare with baseline all_value_str = f"{all_value}({all_baseline_value})" if "Total" in trace else str(all_value) dpu_value_str = ( f"{dpu_value}({dpu_baseline_value})" if dpu_value != dpu_baseline_value else str(dpu_baseline_value) ) cpu_value_str = ( f"{cpu_value}({cpu_baseline_value})" if cpu_value != cpu_baseline_value else str(cpu_baseline_value) ) regressed = "Y" else: # No regression, do not print values cpu_value_str = "-" dpu_value_str = "-" all_value_str = "-" regressed = "N" return all_value_str, dpu_value_str, cpu_value_str, regressed def prepare_model_failure_sections(self, idx, key, job_link, failures_info): # Prepare sections for model failures model_failure_sections = [] # Section for failure information and a button to check results model_failure_sections.append( { "type": "section", "text": { "type": "mrkdwn", "text": f"*{idx}. {key}*", }, "accessory": { "type": "button", "text": {"type": "plain_text", "text": "Check results", "emoji": True}, "url": job_link, }, } ) # Section for detailed failure reports model_header = "Total Ops | DPU Ops | CPU Ops | Reg. | Model\n" model_failures_report = prepare_reports(title="", header=model_header, reports=failures_info) model_failure_sections.append( { "type": "section", "text": {"type": "mrkdwn", "text": model_failures_report}, } ) # Save detailed failure report to a file model_failures_report = prepare_reports( title=f"{idx}. {key}", header=model_header, reports=failures_info, to_truncate=False, ) self.save_failure_report_to_file(key, model_failures_report) return model_failure_sections def save_failure_report_to_file(self, key, model_failures_report): # Save detailed failure report to a file os.makedirs(os.path.join(os.getcwd(), "prev_ci_results"), exist_ok=True) file_path = os.path.join( os.getcwd(), "prev_ci_results", f"model_failures_report_{key.replace(' ', '_').replace('-', '_')}.txt" ) with open(file_path, "w", encoding="UTF-8") as fp: fp.write(model_failures_report) @property def payload(self) -> str: blocks = [self.header] if self.ci_title: blocks.append(self.ci_title_section) if self.n_failures > 0: blocks.append(self.failures) blocks.append(self.category_failures) for block in self.model_failures: blocks.append(block) else: blocks.append(self.no_failures) return json.dumps(blocks) @staticmethod def error_out(title, ci_title="", runner_not_available=False, runner_failed=False, setup_failed=False): blocks = [] title_block = {"type": "header", "text": {"type": "plain_text", "text": title}} blocks.append(title_block) if ci_title: ci_title_block = {"type": "section", "text": {"type": "mrkdwn", "text": ci_title}} blocks.append(ci_title_block) offline_runners = [] if runner_not_available: text = "💔 CI runners are not available! Tests are not run. 😭" result = os.environ.get("OFFLINE_RUNNERS") if result is not None: offline_runners = json.loads(result) elif runner_failed: text = "💔 CI runners have problems! Tests are not run. 😭" elif setup_failed: text = "💔 Tests are not run. 😭" else: text = "💔 There was an issue running the tests. 😭" error_block_1 = { "type": "header", "text": { "type": "plain_text", "text": text, }, } text = "" if len(offline_runners) > 0: text = "\n • " + "\n • ".join(offline_runners) text = f"The following runners are offline:\n{text}\n\n" text += "🙏 Let's fix it ASAP! 🙏" error_block_2 = { "type": "section", "text": { "type": "plain_text", "text": text, }, "accessory": { "type": "button", "text": {"type": "plain_text", "text": "Check Action results", "emoji": True}, "url": f"https://github.com/huggingface/optimum-amd/actions/runs/{os.environ['GITHUB_RUN_ID']}", }, } blocks.extend([error_block_1, error_block_2]) payload = json.dumps(blocks) print("Sending the following payload") print(json.dumps({"blocks": blocks})) client.chat_postMessage( channel=os.environ["CI_SLACK_CHANNEL_ID"], text=text, blocks=payload, ) def get_reply_blocks(self, job_name, failure, text): """ failure: A failure of the form {"line": full test name, "trace": error trace} """ # `text` must be less than 3001 characters in Slack SDK # keep some room for adding "[Truncated]" when necessary MAX_ERROR_TEXT = 3000 - len("[Truncated]") failure_text = "" new_text = failure_text + f'*{failure["line"]}*\n_{failure["trace"]}_\n\n' if len(new_text) > MAX_ERROR_TEXT: # `failure_text` here has length <= 3000 failure_text = new_text[:MAX_ERROR_TEXT] + "[Truncated]" else: # `failure_text` here has length <= MAX_ERROR_TEXT failure_text = new_text title = job_name return [ {"type": "header", "text": {"type": "plain_text", "text": title.upper(), "emoji": True}}, {"type": "section", "text": {"type": "mrkdwn", "text": failure_text}}, ] def post(self): payload = self.payload print("Sending the following payload") print(json.dumps({"blocks": json.loads(payload)})) text = f"{self.n_failures} failures out of {self.n_tests} tests," if self.n_failures else "All tests passed." self.thread_ts = client.chat_postMessage( channel=os.environ["CI_SLACK_CHANNEL_ID"], blocks=payload, text=text, ) def post_reply(self): if self.thread_ts is None: raise ValueError("Can only post reply if a post has been made.") for key, result in self.model_results.items(): for failure in result["failures"]: blocks = self.get_reply_blocks(key, failure, text="test failure") print("Sending the following reply") print(json.dumps({"blocks": blocks})) client.chat_postMessage( channel=os.environ["CI_SLACK_CHANNEL_ID"], text=f"Results for {job}", blocks=blocks, thread_ts=self.thread_ts["ts"], ) time.sleep(1) def retrieve_artifact(artifact_path: str): _artifact = {} if os.path.exists(artifact_path): files = os.listdir(artifact_path) for file in files: try: with open(os.path.join(artifact_path, file)) as f: _artifact[file.split(".")[0]] = f.read() except UnicodeDecodeError as e: raise ValueError(f"Could not open {os.path.join(artifact_path, file)}.") from e return _artifact def retrieve_available_artifacts(): class Artifact: def __init__(self, name: str): self.name = name def __str__(self): return self.name def add_path(self, path: str): self.path = path _available_artifacts: Dict[str, Artifact] = {} directories = filter(os.path.isdir, os.listdir()) for directory in directories: if directory == "reports": for artifact_name in os.listdir(directory): _available_artifacts[artifact_name] = Artifact(artifact_name) _available_artifacts[artifact_name].add_path(os.path.join(directory, artifact_name)) return _available_artifacts def prepare_reports(title, header, reports, to_truncate=True): report = "" MAX_ERROR_TEXT = 3000 - len("[Truncated]") if not to_truncate: MAX_ERROR_TEXT = float("inf") if len(reports) > 0: # `text` must be less than 3001 characters in Slack SDK # keep some room for adding "[Truncated]" when necessary for idx in range(len(reports)): _report = header + "\n".join(reports[: idx + 1]) if title: new_report = f"{title}\n```\n{_report}\n```\n" else: new_report = f"```\n{_report}\n```\n" if len(new_report) > MAX_ERROR_TEXT: # `report` here has length <= 3000 report = report + "[Truncated]" break report = new_report return report if __name__ == "__main__": # runner_not_available = True if runner_status is not None and runner_status != "success" else False # runner_failed = True if runner_env_status is not None and runner_env_status != "success" else False # Let's keep the lines regarding runners' status (we might be able to use them again in the future) runner_not_available = False runner_failed = False org = "huggingface" repo = "optimum-amd" repository_full_name = f"{org}/{repo}" # This env. variable is set in workflow file (under the job `send_results`). ci_event = os.environ["CI_EVENT"] # To find the PR number in a commit title, for example, `Add AwesomeFormer model (#99999)` pr_number_re = re.compile(r"\(#(\d+)\)$") title = f"🤗 Results of the {ci_event} tests." # Add Commit/PR title with a link for push CI # (check the title in 2 env. variables - depending on the CI is triggered via `push` or `workflow_run` event) ci_title_push = os.environ.get("CI_TITLE_PUSH") ci_title_workflow_run = os.environ.get("CI_TITLE_WORKFLOW_RUN") ci_title = ci_title_push if ci_title_push else ci_title_workflow_run ci_sha = os.environ.get("CI_SHA") ci_url = None if ci_sha: ci_url = f"https://github.com/{repository_full_name}/commit/{ci_sha}" if ci_title is not None: if ci_url is None: raise ValueError( "When a title is found (`ci_title`), it means a `push` event or a `workflow_run` even (triggered by " "another `push` event), and the commit SHA has to be provided in order to create the URL to the " "commit page." ) ci_title = ci_title.strip().split("\n")[0].strip() # Retrieve the PR title and author login to complete the report commit_number = ci_url.split("/")[-1] ci_detail_url = f"https://api.github.com/repos/{repository_full_name}/commits/{commit_number}" ci_details = requests.get(ci_detail_url).json() ci_author = ci_details["author"]["login"] merged_by = None # Find the PR number (if any) and change the url to the actual PR page. numbers = pr_number_re.findall(ci_title) if len(numbers) > 0: pr_number = numbers[0] ci_detail_url = f"https://api.github.com/repos/{repository_full_name}/pulls/{pr_number}" ci_details = requests.get(ci_detail_url).json() ci_author = ci_details["user"]["login"] ci_url = f"https://github.com/{repository_full_name}/pull/{pr_number}" merged_by = ci_details["merged_by"]["login"] if merged_by is None: ci_title = f"<{ci_url}|{ci_title}>\nAuthor: {ci_author}" else: ci_title = f"<{ci_url}|{ci_title}>\nAuthor: {ci_author} | Merged by: {merged_by}" elif ci_sha: ci_title = f"<{ci_url}|commit: {ci_sha}>" else: ci_title = "" if runner_not_available or runner_failed: Message.error_out(title, ci_title, runner_not_available, runner_failed) exit(0) github_actions_jobs = get_jobs( workflow_run_id=os.environ["GITHUB_RUN_ID"], # token=os.environ["ACCESS_REPO_INFO_TOKEN"] ) artifact_name_to_job_map = {} for job in github_actions_jobs: artifact_name = job["name"].split(" ")[0] artifact_name_to_job_map[artifact_name] = job available_artifacts = retrieve_available_artifacts() if len(available_artifacts) == 0: Message.error_out(title, ci_title, runner_not_available, runner_failed, setup_failed=True) exit(0) test_categories = { "Pre-Quantized Model": "run_tests_prequantized_models", "Timm Quantization": "run_tests_quantization", } results = { key: { "failed": 0, "success": 0, "time_spent": "", "error": False, "failures": [], "job_link": 0, } for key in test_categories.keys() } for key in results.keys(): if test_categories[key] not in available_artifacts: results[key]["error"] = True continue artifact_path = available_artifacts[test_categories[key]].path job = artifact_name_to_job_map[test_categories[key]] results[key]["job_link"] = job["html_url"] artifact = retrieve_artifact(artifact_path) stacktraces = handle_stacktraces(artifact["failures_line"]) failed, success, time_spent = handle_test_results(artifact["stats"]) results[key]["failed"] = failed results[key]["success"] = success results[key]["time_spent"] = time_spent[1:-1] + ", " if len(artifact["errors"]): results[key]["error"] = True if failed: for line in artifact["summary_short"].split("\n"): if line.startswith("FAILED "): line = line[len("FAILED ") :] line = line.split()[0].replace("\n", "") results[key]["failures"].append({"line": line, "trace": stacktraces.pop(0)}) message = Message( title, ci_title, results, ) message.post() message.post_reply()