analytics/circleci_analyze.py (521 lines of code) (raw):

#!/usr/bin/env python3.7 from datetime import datetime, time import json import requests import itertools import sqlite3 import os import sys from typing import Callable, Dict, Generator, List, MutableSet, Optional def get_executor_price_rate(executor): (etype, eclass) = executor['type'], executor['resource_class'] assert etype in ['machine', 'external', 'docker', 'macos', 'runner'], f'Unexpected type {etype}:{eclass}' if etype == 'machine': return { 'medium': 10, 'large': 20, 'xlarge': 100, '2xlarge': 200, 'gpu.medium': 160, 'gpu.large': 320, 'gpu.small': 80, 'windows.medium': 40, 'windows.large': 120, 'windows.xlarge': 210, 'windows.2xlarge': 500, 'windows.gpu.nvidia.medium': 500, 'gpu.nvidia.small': 160, 'gpu.nvidia.medium': 240, 'gpu.nvidia.large': 1000, }[eclass] if etype == 'macos': return { 'medium': 50, 'large': 100, }[eclass] if etype == 'docker': return { 'small': 5, 'medium': 10, 'medium+': 15, 'large': 20, 'xlarge': 40, '2xlarge': 80, '2xlarge+': 100, }[eclass] if etype == 'runner' or etype == 'external': return { 'pytorch/amd-gpu': 0, }[eclass] raise RuntimeError(f'Undefined executor {etype}:{eclass}') price_per_credit = 6e-4 def get_circleci_token() -> str: token_file_path = os.path.join(os.getenv('HOME'), '.circleci_token') token = os.getenv('CIRCLECI_TOKEN') if token is not None: return token if not os.path.exists(token_file_path): raise RuntimeError('Can not get CirclCI token' ' neither from CIRCLECI_TOKEN environment variable,' ' nor via ~/.circleci_token file') with open(token_file_path) as f: return f.read().strip() def is_workflow_in_progress(workflow: Dict) -> bool: return workflow['status'] in ['running', 'not_run', 'failing', 'on_hold'] def str2date(val: str) -> datetime: assert val is not None return datetime.fromisoformat(val[:-1] if val.endswith('Z') else val) class CircleCICache: def __init__(self, token: Optional[str], db_name: str = 'circleci-cache.db') -> None: file_folder = os.path.dirname(__file__) self.url_prefix = 'https://circleci.com/api/v2' self.session = requests.session() self.headers = { 'Accept': 'application/json', 'Circle-Token': token, } if token is not None else None self.db = sqlite3.connect(os.path.join(file_folder, db_name)) self.db.execute('CREATE TABLE IF NOT EXISTS jobs(slug TEXT NOT NULL, job_id INTEGER NOT NULL, json TEXT NOT NULL);') self.db.execute('CREATE TABLE IF NOT EXISTS artifacts(slug TEXT NOT NULL, job_id INTEGER NOT NULL, json TEXT NOT NULL);') self.db.execute('CREATE UNIQUE INDEX IF NOT EXISTS jobs_key on jobs(slug, job_id);') self.db.execute('CREATE TABLE IF NOT EXISTS workflows(id TEXT NOT NULL PRIMARY KEY, json TEXT NOT NULL);') self.db.execute('CREATE TABLE IF NOT EXISTS pipeline_workflows(id TEXT NOT NULL PRIMARY KEY, json TEXT NOT NULL);') self.db.execute('CREATE TABLE IF NOT EXISTS pipelines(id TEXT NOT NULL PRIMARY KEY, json TEXT NOT NULL, branch TEXT, revision TEXT);') self.db.commit() def is_offline(self) -> bool: return self.headers is None def _get_paged_items_list(self, url: str, params: Optional[Dict] = None, item_count: Optional[int] = -1) -> List: rc, token, run_once = [], None, False def _should_quit(): nonlocal run_once, rc, token if not run_once: run_once = True return False if token is None: return True if item_count is None: return True return item_count >= 0 and len(rc) >= item_count if params is None: params = {} while not _should_quit(): if token is not None: params['page-token'] = token r = self.session.get(url, params=params, headers=self.headers) try: j = r.json() except json.JSONDecodeError: print(f"Failed to decode {rc}", file=sys.stderr) raise if 'message' in j: raise RuntimeError(f'Failed to get list from {url}: {j["message"]}') token = j['next_page_token'] rc.extend(j['items']) return rc def get_pipelines(self, project: str = 'github/pytorch/pytorch', branch: Optional[str] = None, item_count: Optional[int] = None) -> List: if self.is_offline(): c = self.db.cursor() cmd = "SELECT json from pipelines" if branch is not None: cmd += f" WHERE branch='{branch}'" if item_count is not None and item_count > 0: cmd += f" LIMIT {item_count}" c.execute(cmd) return [json.loads(val[0]) for val in c.fetchall()] rc = self._get_paged_items_list(f'{self.url_prefix}/project/{project}/pipeline', {'branch': branch} if branch is not None else {}, item_count) for pipeline in rc: vcs = pipeline['vcs'] pid, branch, revision, pser = pipeline['id'], vcs['branch'], vcs['revision'], json.dumps(pipeline) self.db.execute("INSERT OR REPLACE INTO pipelines(id, branch, revision, json) VALUES (?, ?, ?, ?)", (pid, branch, revision, pser)) self.db.commit() return rc def get_pipeline_workflows(self, pipeline) -> List: c = self.db.cursor() c.execute("SELECT json FROM pipeline_workflows WHERE id=?", (pipeline,)) rc = c.fetchone() if rc is not None: rc = json.loads(rc[0]) if not any(is_workflow_in_progress(w) for w in rc) or self.is_offline(): return rc if self.is_offline(): return [] rc = self._get_paged_items_list(f'{self.url_prefix}/pipeline/{pipeline}/workflow') self.db.execute("INSERT OR REPLACE INTO pipeline_workflows(id, json) VALUES (?, ?)", (pipeline, json.dumps(rc))) self.db.commit() return rc def get_workflow_jobs(self, workflow, should_cache=True) -> List: c = self.db.cursor() c.execute("select json from workflows where id=?", (workflow,)) rc = c.fetchone() if rc is not None: return json.loads(rc[0]) if self.is_offline(): return [] rc = self._get_paged_items_list(f'{self.url_prefix}/workflow/{workflow}/job') if should_cache: self.db.execute("INSERT INTO workflows(id, json) VALUES (?, ?)", (workflow, json.dumps(rc))) self.db.commit() return rc def get_job(self, project_slug, job_number) -> Dict: c = self.db.cursor() c.execute("select json from jobs where slug=? and job_id = ?", (project_slug, job_number)) rc = c.fetchone() if rc is not None: return json.loads(rc[0]) if self.is_offline(): return {} r = self.session.get(f'{self.url_prefix}/project/{project_slug}/job/{job_number}', headers=self.headers) try: rc = r.json() except json.JSONDecodeError: print(f"Failed to decode {rc}", file=sys.stderr) raise self.db.execute("INSERT INTO jobs(slug,job_id, json) VALUES (?, ?, ?)", (project_slug, job_number, json.dumps(rc))) self.db.commit() return rc def get_job_artifacts(self, project_slug, job_number) -> List[Dict]: c = self.db.cursor() c.execute("select json from artifacts where slug=? and job_id = ?", (project_slug, job_number)) rc = c.fetchone() if rc is not None: return json.loads(rc[0]) if self.is_offline(): return [{}] rc = self._get_paged_items_list(f"{self.url_prefix}/project/{project_slug}/{job_number}/artifacts") self.db.execute("INSERT INTO artifacts(slug,job_id, json) VALUES (?, ?, ?)", (project_slug, job_number, json.dumps(rc))) self.db.commit() return rc def get_pipeline_jobs(self, project: str = 'github/pytorch/pytorch', branch: Optional[str] = None, item_count: Optional[int] = None) -> Generator: for pipeline in self.get_pipelines(project, branch, item_count): for workflow in self.get_pipeline_workflows(pipeline['id']): in_progress = is_workflow_in_progress(workflow) for job in self.get_workflow_jobs(workflow['id'], should_cache=not in_progress): yield (pipeline, workflow, job) def get_jobs_summary(self, slug='gh/pytorch/pytorch', workflow='build') -> Dict: items = self._get_paged_items_list(f'{self.url_prefix}/insights/{slug}/workflows/{workflow}/jobs') return {item['name']: item for item in items} def get_job_timeseries(self, job_name: str, slug: str = 'gh/pytorch/pytorch', workflow: str = 'build', branch: Optional[str] = None) -> List: params = {'branch': branch} if branch is not None else {} items = self._get_paged_items_list(f'{self.url_prefix}/insights/{slug}/workflows/build/jobs/{job_name}', params) return [(str2date(x['started_at']), x['duration']) for x in items if x['status'] == 'success'] def aggregate_by_day(series): rc = {} for (ts, val) in series: date = datetime.combine(ts.date(), time()) valcount = [val, 1.0] if date not in rc: rc[date] = valcount else: rc[date] = [sum(x) for x in zip(rc[date], valcount)] return [(x, rc[x][0] / rc[x][1]) for x in sorted(rc.keys())] def filter_names(names: List[str], name_filter: Optional[str] = None) -> List[str]: import re if name_filter is None: return names filters = name_filter.split(",") return [name for name in names if any(re.match(filter, name) for filter in filters)] def common_prefix(names: List[str]) -> str: if len(names) == 0 or len(names[0]) == 0: return '' if len(names) == 1: return names[0] rc = names[0][0] while rc != names[0] and all(name.startswith(rc) for name in names[1:]): rc = names[0][:len(rc) + 1] return rc[:-1] def plot_graph(name_filter: Optional[str] = None, output_file: Optional[str] = None, branch: Optional[str] = None) -> None: import matplotlib.pyplot as plt import matplotlib.dates as mdates ci_cache = CircleCICache(token=get_circleci_token()) summary = ci_cache.get_jobs_summary() test_jobs = [name for name in summary.keys() if name.startswith('pytorch') and 'test' in name] filtered_jobs = filter_names(test_jobs, name_filter) prefix = common_prefix(filtered_jobs) if len(filtered_jobs) == 0: print(f'Filter "{name_filter}" does not match to any of {test_jobs}') return series = [] labels = [] styles = [f'{color}{style}' for (style, color) in itertools.product(['-', '--', '-.', ':'], ['b', 'g', 'r', 'c', 'm', 'y', 'k'])] fig, ax = plt.subplots() for name in test_jobs: label = f"{name}(p95 = {int(summary[name]['metrics']['duration_metrics']['p95']/60)} min)" if name not in filtered_jobs: print(label) continue ts = ci_cache.get_job_timeseries(name, branch=branch) if len(ts) == 0: print(f'{label} time series is empty!') continue print(f'{label} time series has {len(ts)} elements') labels.append(label[len(prefix):]) series.append(ts) x, y = zip(*aggregate_by_day(ts)) plt.plot(x, [i / 60.0 for i in y], styles[len(labels) % len(styles)]) plt.legend(labels, loc='upper left') plt.title(f'{prefix} timeseries') ax.set_ylabel("Duration (m)") # Format date ax.xaxis.set_major_formatter(mdates.DateFormatter('%b %d')) # Rotate tick labels plt.setp(ax.get_xticklabels(), rotation=45, ha='right', rotation_mode='anchor') if output_file is not None: plt.savefig(output_file) else: plt.show() def print_line(line: str, padding: Optional[int] = None, newline: bool = True) -> None: if padding is not None and len(line) < padding: line += ' ' * (padding - len(line)) print(line, end='\n' if newline else '\r', flush=True) def fetch_status(branch=None, item_count=50): isatty = sys.stdout.isatty() padding = os.get_terminal_size().columns - 1 if isatty else None ci_cache = CircleCICache(token=get_circleci_token()) print(f"About to fetch {item_count} latest pipelines against {branch if branch is not None else 'all branches'}") pipelines = ci_cache.get_pipelines(branch=branch, item_count=item_count) total_price, total_master_price = 0, 0 for pipeline_idx, pipeline in enumerate(pipelines): revision = pipeline['vcs']['revision'] branch = pipeline['vcs']['branch'] workflows = ci_cache.get_pipeline_workflows(pipeline['id']) known_job_ids = [] for workflow in workflows: url = f'https://app.circleci.com/pipelines/github/pytorch/pytorch/{workflow["pipeline_number"]}/workflows/{workflow["id"]}' if is_workflow_in_progress(workflow): print_line(f'Skipping {url} name:{workflow["name"]} status:{workflow["status"]}', newline=not sys.stdout.isatty()) continue rerun = False total_credits, test_credits, gpu_credits, wincpu_credits, wingpu_credits = 0, 0, 0, 0, 0 jobs = ci_cache.get_workflow_jobs(workflow['id']) for job in jobs: job_name, job_status, job_number = job['name'], job['status'], job.get('job_number', None) if job_status in ['blocked', 'canceled', 'unauthorized', 'running', 'not_run', 'failing']: continue if job_number is None: print(job) continue if job_number in known_job_ids: rerun = True continue job_info = ci_cache.get_job(job['project_slug'], job_number) if 'executor' not in job_info: print(f'executor not found in {job_info}') continue job_executor = job_info['executor'] resource_class = job_executor['resource_class'] if resource_class is None: print(f'resource_class is none for {job_info}') continue job_on_gpu = 'gpu' in resource_class job_on_win = 'windows' in resource_class if job_status != 'infrastructure_fail': duration = str2date(job_info['stopped_at']) - str2date(job_info['started_at']) job_credits = get_executor_price_rate(job_executor) * int(job_info['duration']) * 1e-3 / 60 else: job_credits, duration = 0, 0 job_cost = job_credits * price_per_credit total_credits += job_credits if 'test' in job_name or job_name.startswith('smoke_'): test_credits += job_credits elif job_on_gpu: print(f'Running build job {job_name} on GPU!!!') if job_on_gpu: gpu_credits += job_credits if job_on_win: wingpu_credits += job_credits if job_on_win and not job_on_gpu: wincpu_credits += job_credits known_job_ids.append(job_number) print_line(f' {job_name} {job_status} {duration} ${job_cost:.2f}', padding=padding, newline=not isatty) # Increment totals total_price += total_credits * price_per_credit if branch in ['master', 'nightly', 'postnightly', 'release/1.6']: total_master_price += total_credits * price_per_credit # skip small jobs if total_credits * price_per_credit < .1: continue workflow_status = f'[{pipeline_idx}/{len(pipelines)}]' workflow_status += f' {url} {workflow["name"]} status:{workflow["status"]}' workflow_status += f' price: ${total_credits * price_per_credit:.2f}' workflow_status += ' (Rerun?)' if rerun else '' workflow_status += f'\n\t\tdate: {workflow["created_at"]} branch:{branch} revision:{revision}' workflow_status += f'\n\t\ttotal credits: {int(total_credits)}' if test_credits != 0: workflow_status += f' testing: {100 * test_credits / total_credits:.1f}%' if gpu_credits != 0: workflow_status += f' GPU testing: {100 * gpu_credits / total_credits:.1f}%' if wingpu_credits != 0: workflow_status += f' WINGPU/GPU: {100 * wingpu_credits / gpu_credits:.1f}%' if wincpu_credits != 0: workflow_status += f' Win CPU: {100 * wincpu_credits / total_credits:.1f}%' workflow_status += f' Total: ${total_price:.2f} master fraction: {100 * total_master_price/ total_price:.1f}%' print_line(workflow_status, padding=padding) def plot_heatmap(cov_matrix, names): import numpy as np import matplotlib.pyplot as plt assert cov_matrix.shape == (len(names), len(names)) fig, ax = plt.subplots() ax.imshow(cov_matrix) ax.set_xticks(np.arange(len(names))) ax.set_yticks(np.arange(len(names))) ax.set_xticklabels(names) ax.set_yticklabels(names) # Rotate tick labels plt.setp(ax.get_xticklabels(), rotation=45, ha='right', rotation_mode='anchor') # Annotate values for i in range(len(names)): for j in range(len(names)): ax.text(j, i, f'{cov_matrix[i, j]:.2f}', ha='center', va='center', color='w') plt.show() def filter_service_jobs(name): if name.startswith('docker'): return True if name.startswith('binary'): return True return False def filter_cuda_test(name): if filter_service_jobs(name): return False if 'libtorch' in name: return False if 'test' not in name: return False # Skip jit-profiling tests if 'jit-profiling' in name: return False if 'cuda11' in name: return False # Skip VS2017 tests if 'vs2017' in name: return False return 'cuda' in name and 'nogpu' not in name def filter_cuda_build(name): if filter_service_jobs(name): return False if 'libtorch' in name: return False return 'cuda' in name and name.endswith('build') def filter_windows_test(name): if filter_service_jobs(name): return False # Skip jit-profiling tests if 'jit-profiling' in name: return False return 'test' in name and 'windows' in name def compute_covariance(branch='master', name_filter: Optional[Callable[[str], bool]] = None): import numpy as np revisions: MutableSet[str] = set() job_summary: Dict[str, Dict[str, float]] = {} # Extract data print(f"Computing covariance for {branch if branch is not None else 'all branches'}") ci_cache = CircleCICache(None) pipelines = ci_cache.get_pipelines(branch=branch) for pipeline in pipelines: if pipeline['trigger']['type'] == 'schedule': continue revision = pipeline['vcs']['revision'] pipeline_jobs: Dict[str, float] = {} blocked_jobs: MutableSet[str] = set() workflows = ci_cache.get_pipeline_workflows(pipeline['id']) for workflow in workflows: if is_workflow_in_progress(workflow): continue jobs = ci_cache.get_workflow_jobs(workflow['id']) for job in jobs: job_name = job['name'] job_status = job['status'] # Handle renames if job_name == 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_NO_AVX2_test': job_name = 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_nogpu_NO_AVX2_test' if job_name == 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_NO_AVX_NO_AVX2_test': job_name = 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_nogpu_NO_AVX_test' if job_status in ['infrastructure_fail', 'canceled']: continue if callable(name_filter) and not name_filter(job_name): continue if job_status == 'blocked': blocked_jobs.add(job_name) continue if job_name in blocked_jobs: blocked_jobs.remove(job_name) result = 1.0 if job_status == 'success' else -1.0 pipeline_jobs[job_name] = result # Skip build with blocked job [which usually means build failed due to the test failure] if len(blocked_jobs) != 0: continue # Skip all success workflows if all(result == 1.0 for result in pipeline_jobs.values()): continue revisions.add(revision) for job_name in pipeline_jobs: if job_name not in job_summary: job_summary[job_name] = {} job_summary[job_name][revision] = pipeline_jobs[job_name] # Analyze results job_names = sorted(job_summary.keys()) # revisions = sorted(revisions) job_data = np.zeros((len(job_names), len(revisions)), dtype=np.float) print(f"Number of observations: {len(revisions)}") for job_idx, job_name in enumerate(job_names): job_row = job_summary[job_name] for rev_idx, revision in enumerate(revisions): if revision in job_row: job_data[job_idx, rev_idx] = job_row[revision] success_rate = job_data[job_idx, ].sum(where=job_data[job_idx, ] > 0.0) / len(job_row) present_rate = 1.0 * len(job_row) / len(revisions) print(f"{job_name}: missing {100.0 * (1.0 - present_rate):.2f}% success rate: {100 * success_rate:.2f}%") cov_matrix = np.corrcoef(job_data) plot_heatmap(cov_matrix, job_names) def print_artifacts(branch, item_count, name_filter: Callable[[str], bool]) -> None: ci_cache = CircleCICache(token=get_circleci_token()) for pipeline, _, job in ci_cache.get_pipeline_jobs(branch=branch, item_count=item_count): revision = pipeline['vcs']['revision'] if not name_filter(job["name"]): continue job_number = job.get("job_number") if job_number is None: continue artifacts = ci_cache.get_job_artifacts('gh/pytorch/pytorch', job_number) for artifact in artifacts: name = os.path.basename(artifact['path']) url = artifact["url"] print(f"{revision} {name} {url}") def print_duration(branch, item_count, name_filter: Callable[[str], bool]) -> None: ci_cache = CircleCICache(token=get_circleci_token()) for pipeline, workflow, job in ci_cache.get_pipeline_jobs(branch=branch, item_count=item_count): job_name, job_status, job_number = job['name'], job['status'], job.get("job_number") revision = pipeline['vcs']['revision'] if not name_filter(job_name) or job_number is None: continue if job_status in ['blocked', 'canceled', 'unauthorized', 'running', 'not_run', 'failing']: continue started_at = str2date(job['started_at']) stopped_at = str2date(job['stopped_at']) duration = stopped_at - started_at print(f"{job_name} {revision} {duration} {started_at}") def parse_arguments(): from argparse import ArgumentParser parser = ArgumentParser(description="Download and analyze circle logs") parser.add_argument('--plot-graph', type=str, nargs='?', help="Plot job time trends", const='') parser.add_argument('--output', type=str, help="Output file name for the graphs") parser.add_argument('--get_artifacts', type=str) parser.add_argument('--print-duration', type=str) parser.add_argument('--branch', type=str) parser.add_argument('--item_count', type=int, default=100) parser.add_argument('--compute_covariance', choices=['cuda_test', 'cuda_build', 'windows_test']) return parser.parse_args() if __name__ == '__main__': args = parse_arguments() if args.get_artifacts is not None: print_artifacts(branch=args.branch, item_count=args.item_count, name_filter=lambda x: args.get_artifacts in x) sys.exit(0) if args.print_duration is not None: print_duration(branch=args.branch, item_count=args.item_count, name_filter=lambda x: args.print_duration in x) sys.exit(0) if args.compute_covariance is not None: name_filter = { 'cuda_test': filter_cuda_test, 'cuda_build': filter_cuda_build, 'windows_test': filter_windows_test, }[args.compute_covariance] compute_covariance(branch=args.branch, name_filter=name_filter) sys.exit(0) if args.plot_graph is not None: plot_graph(args.plot_graph, args.output, args.branch) sys.exit(0) fetch_status(branch=args.branch, item_count=args.item_count)