analytics/s3_test_stats_analyze.py (125 lines of code) (raw):

import argparse import boto3 import bz2 import json import os import re import requests import pandas as pd from datetime import datetime, timedelta from tqdm import tqdm from typing import Any, Dict, Optional, List S3 = boto3.resource('s3') CLIENT = boto3.client('s3') BUCKET = S3.Bucket('ossci-metrics') GITHUB_API_BASE = "https://api.github.com/" GITHUB_COMMITS_API = "repos/pytorch/pytorch/commits" STRF_FORMAT = "%Y-%m-%dT%H:%M:%SZ" CACHE_PICKLE = "cache/test_time/dataframe.pickle" def _get_latests_git_commit_sha_list(lookback: int): sha_since = (datetime.utcnow() - timedelta(hours = lookback)).strftime(STRF_FORMAT) resp = requests.get(GITHUB_API_BASE + GITHUB_COMMITS_API + f"?since={sha_since}") if resp.status_code == 200: return [e.get('sha') for e in resp.json()] else: return [] def _json_to_df(data: Dict[str, Any], granularity: str) -> pd.DataFrame: reformed_data = list() for fname, fdata in data['files'].items(): if granularity == 'file': reformed_data.append({ "job": data['job'], "sha": data['sha'], 'file': fname, 'file_total_sec': fdata['total_seconds'], }) else: for sname, sdata in fdata['suites'].items(): if granularity == 'suite': reformed_data.append({ "job": data['job'], "sha": data['sha'], 'suite': sname, 'suite_total_sec': sdata['total_seconds'], }) else: for cname, cdata in sdata['cases'].items(): reformed_data.append({ "job": data['job'], "sha": data['sha'], 'case': cname, 'case_status': cdata['status'], 'case_sec': cdata['seconds'], }) df = pd.json_normalize(reformed_data) return df def download_stats(folder: str, lookback: int): commit_sha_list = _get_latests_git_commit_sha_list(lookback) for commit_sha in commit_sha_list: for key in tqdm(BUCKET.objects.filter(Prefix=f'test_time/{commit_sha}')): remote_fname = key.key local_fname = os.path.join(folder, remote_fname) # TODO: Do this in parallel if not os.path.exists(local_fname): dirname = os.path.dirname(local_fname) if not os.path.exists(dirname): os.makedirs(dirname) # only download when there's a cache miss if not os.path.exists(local_fname) or not os.path.isfile(local_fname): print(f"\nDownloading {remote_fname}...") CLIENT.download_file("ossci-metrics", remote_fname, local_fname) def parse_and_export_stats(folder: str, granularity: str, commit_sha_lists: Optional[List[str]] = None): dataframe = None for (dirpath, _, filenames) in os.walk(folder): for filename in tqdm(filenames): splits = dirpath.split("/") job_name = splits[-1] sha = splits[-2] if not commit_sha_lists or sha in commit_sha_lists: with bz2.open(os.path.join(dirpath, filename), 'r') as zf: string = zf.read().decode("utf-8") data = json.loads(string) # create a deep json with sha and job info data['sha'] = sha data['job'] = job_name df = _json_to_df(data, granularity) dataframe = df if dataframe is None else dataframe.append(df) return dataframe def main(): parser = argparse.ArgumentParser( __file__, description="download and cache test stats locally, both raw and pandas format", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument( '--lookback', type=int, help='lookback in # of hours', default=24, ) parser.add_argument( '--output', help='output filename', default='cache/df.pickle', ) parser.add_argument( '--cache_folder', help='cache folder', default='cache', ) parser.add_argument( '--granularity', choices=['file', 'suite', 'case'], help='granularity of stats summary', default='file', ) args = parser.parse_args() lookback = args.lookback cache_folder = args.cache_folder output = args.output granularity = args.granularity print("Downloading test stats") download_stats(cache_folder, lookback) print("Parsing test stats and write to pd dataframe") if not os.path.exists(output): dataframe = parse_and_export_stats(f'{cache_folder}/test_time/', granularity) dataframe.to_pickle(output) if __name__ == "__main__": main()