variance-analysis/mach_perftest_notebook_dev/perftestnotebook/artifact_downloader.py (384 lines of code) (raw):

import argparse import glob import json import os import shutil import threading import time import zipfile import requests try: from urllib.parse import urlencode from urllib.request import urlopen, urlretrieve except ImportError: from urllib import urlencode, urlretrieve from urllib2 import urlopen # Use this program to dowwnload, extract, and distribute GRCOV # files that are to be used for the variability analysis. # Use just the groupID, it absoutely needs to be given. With that, get the task details # for the entire group, and find all the tests specified with the suite, chunk, and mode # given through the parser arguments. For each of those tests, take the taskId # and download the GRCOV data chunk. Continue suffixing them, however, store # a json for a mapping from numbers to taskID's for future reference. # The suite should include the flavor. It makes no sense to aggregate the data from # multiple flavors together because they don't run the same tests. This is also # why you cannot specify more than one suite and chunk. def artifact_downloader_parser(): parser = argparse.ArgumentParser( "This tool can download the GRCOV data from a group of linux64-ccov " + "taskcluster tasks. It then extracts the data, suffixes it with " + "a number and then stores it in an output directory." ) parser.add_argument( "--task-group-id", type=str, nargs=1, help="The group of tasks that should be parsed to find all the necessary " + "data to be used in this analysis. ", ) parser.add_argument( "--test-suites-list", type=str, nargs="+", help="The listt of tests to look at. e.g. mochitest-browser-chrome-e10s-2." + " If it`s empty we assume that it means nothing, if `all` is given all suites" + " will be processed.", ) parser.add_argument( "--artifact-to-get", type=str, nargs=1, default="grcov", help="Pattern matcher for the artifact you want to download. By default, it" + " is set to `grcov` to get ccov artifacts. Use `per_test_coverage` to get data" + " from test-coverage tasks.", ) parser.add_argument( "--unzip-artifact", action="store_true", default=False, help="Set to False if you don`t want the artifact to be extracted.", ) parser.add_argument( "--platform", type=str, default="test-linux64-ccov", help="Platform to obtain data from.", ) parser.add_argument( "--download-failures", action="store_true", default=False, help="Platform to obtain data from.", ) parser.add_argument( "--ingest-continue", action="store_true", default=False, help="Continues from the same run it was doing before.", ) parser.add_argument( "--output", type=str, nargs=1, help="This is the directory where all the download, extracted, and suffixed " + "data will reside.", ) return parser # Used to limit the number of concurrent data requests START_TIME = time.time() MAX_REQUESTS = 20 CURR_REQS = 0 RETRY = 5 TOTAL_TASKS = 0 CURR_TASK = 0 FAILED = [] ALL_TASKS = [] def log(msg): global CURR_TASK global TOTAL_TASKS elapsed_time = time.time() - START_TIME val = time.strftime("%H:%M:%S", time.gmtime(elapsed_time)) pc = round((CURR_TASK / TOTAL_TASKS) * 100, 1) if TOTAL_TASKS else 0 print( "[%s][INFO] %s/%s %s - %s" % (val, str(CURR_TASK + 1), str(TOTAL_TASKS), pc, msg) ) def warning(msg): global CURR_TASK global TOTAL_TASKS elapsed_time = time.time() - start_time val = time.strftime("%H:%M:%S", time.gmtime(elapsed_time)) pc = round((CURR_TASK / TOTAL_TASKS) * 100, 1) if TOTAL_TASKS else 0 print( "[%s][WARNING] %s/%s %s - %s" % (val, str(CURR_TASK + 1), str(TOTAL_TASKS), pc, msg) ) def get_json(url, params=None): if params is not None: url += "?" + urlencode(params) r = urlopen(url).read().decode("utf-8") return json.loads(r) def get_task_details(task_id): task_details = get_json("https://queue.taskcluster.net/v1/task/" + task_id) return task_details def get_task_artifacts(task_id): artifacts = get_json( "https://queue.taskcluster.net/v1/task/" + task_id + "/artifacts" ) return artifacts["artifacts"] def get_tasks_in_group(group_id): reply = get_json( "https://queue.taskcluster.net/v1/task-group/" + group_id + "/list", {"limit": "200"}, ) tasks = reply["tasks"] while "continuationToken" in reply: reply = get_json( "https://queue.taskcluster.net/v1/task-group/" + group_id + "/list", {"limit": "200", "continuationToken": reply["continuationToken"]}, ) tasks += reply["tasks"] return tasks def download_artifact(task_id, artifact, output_dir): global FAILED fname = os.path.join(output_dir, task_id + "_" + os.path.basename(artifact["name"])) log("Downloading " + artifact["name"] + " to: " + fname) if os.path.exists(fname): log("File already exists.") return fname tries = 0 url_data = ( "https://queue.taskcluster.net/v1/task/" + task_id + "/artifacts/" + artifact["name"] ) while tries < RETRY: try: # Make the actual request request = requests.get(url_data, timeout=60, stream=True) # Open the output file and make sure we write in binary mode with open(fname, "wb") as fh: # Walk through the request response in chunks of 1024 * 1024 bytes, so 1MiB for chunk in request.iter_content(1024 * 1024): # Write the chunk to the file fh.write(chunk) break except Exception as e: log( "Failed to get data from %s: %s - %s" % (url_data, e.__class__.__name__, e) ) if tries < RETRY: tries += 1 log("Retrying %s more times..." % str(RETRY - tries)) else: warning("No more retries. Failed to download %s" % url) FAILED.append(task_id) raise # urlretrieve( # 'https://queue.taskcluster.net/v1/task/' + task_id + '/artifacts/' + artifact['name'], # fname # ) return fname def suite_name_from_task_name(name): psn = name.split("/")[-1] psn = "-".join(psn.split("-")[1:]) return psn def make_count_dir(a_path): os.makedirs(a_path, exist_ok=True) return a_path def unzip_file(abs_zip_path, output_dir, count=0): tmp_path = "" with zipfile.ZipFile(abs_zip_path, "r") as z: tmp_path = os.path.join(output_dir, str(count)) if not os.path.exists(tmp_path): make_count_dir(tmp_path) z.extractall(tmp_path) return tmp_path def move_file(abs_filepath, output_dir, count=0): tmp_path = os.path.join(output_dir, str(count)) _, fname = os.path.split(abs_filepath) if not os.path.exists(tmp_path): make_count_dir(tmp_path) if os.path.exists(os.path.join(tmp_path, fname)): return shutil.copyfile(abs_filepath, os.path.join(tmp_path, fname)) return tmp_path def artifact_downloader( task_group_id, output_dir=os.getcwd(), test_suites=[], download_failures=False, artifact_to_get="grcov", unzip_artifact=True, platform="test-linux64-ccov", ingest_continue=False, ): global CURR_REQS global CURR_TASK global TOTAL_TASKS global FAILED global ALL_TASKS head_rev = "" all_tasks = False if "all" in test_suites: all_tasks = True # Make the data directories task_dir = os.path.join(output_dir, task_group_id) run_number = 0 max_num = 0 if not os.path.exists(task_dir): os.makedirs(task_dir, exist_ok=True) else: # Get current run number curr_dir = os.getcwd() os.chdir(task_dir) dir_list = next(os.walk("."))[1] max_num = 0 for subdir in dir_list: run_num = int(subdir) if run_num > max_num: max_num = run_num os.chdir(curr_dir) if not ingest_continue and max_num: run_number = max_num + 1 output_dir = os.path.join(output_dir, task_dir, str(run_number)) os.makedirs(output_dir, exist_ok=True) task_ids = [] log("Getting task group information...") tgi_path = os.path.join(output_dir, "task-group-information.json") if os.path.exists(tgi_path): with open(tgi_path, "r") as f: tasks = json.load(f) else: tasks = get_tasks_in_group(task_group_id) with open(tgi_path, "w") as f: json.dump(tasks, f, indent=4) log("Obtained") # Used to keep track of how many grcov files # we are downloading per test. task_counters = {} taskid_to_file_map = {} # For each task in this group threads = [] TOTAL_TASKS = len(tasks) for task in tasks: download_this_task = False # Get the test name if platform not in task["task"]["metadata"]["name"]: continue test_name = suite_name_from_task_name(task["task"]["metadata"]["name"]) log( "Found %s with suite-name: %s" % (task["task"]["metadata"]["name"], test_name) ) # If all tests weren't asked for but this test is # asked for, set the flag. if (not all_tasks) and test_name in test_suites: download_this_task = True if all_tasks or download_this_task: # Make directories for this task head_rev = task["task"]["payload"]["env"]["GECKO_HEAD_REV"] grcov_dir = os.path.join(output_dir, test_name) downloads_dir = os.path.join(os.path.join(grcov_dir, "downloads")) data_dir = os.path.join( os.path.join(grcov_dir, (artifact_to_get.replace(".", "")) + "_data") ) if test_name not in task_counters: os.makedirs(grcov_dir, exist_ok=True) os.makedirs(downloads_dir, exist_ok=True) os.makedirs(data_dir, exist_ok=True) task_counters[test_name] = 0 else: task_counters[test_name] += 1 task_id = task["status"]["taskId"] ALL_TASKS.append(task_id) def get_artifacts( task_id, downloads_dir, data_dir, unzip_artifact, test_counter, test_name, artifact_to_get, download_failures, taskid_to_file_map, ): global CURR_REQS files = os.listdir(downloads_dir) ffound = [f for f in files if artifact_to_get in f and task_id in f] if ffound: log("File already exists.") CURR_REQS -= 1 if artifact_to_get == "grcov" or unzip_artifact: unzip_file(filen, data_dir, test_counter) else: move_file(filen, data_dir, test_counter) taskid_to_file_map[task_id] = os.path.join( data_dir, str(test_counter) ) return fname CURR_REQS += 1 log("Getting task artifacts for %s" % task_id) artifacts = get_task_artifacts(task_id) CURR_REQS -= 1 if not download_failures: log("Checking for failures on %s" % task_id) failed = None for artifact in artifacts: if "log_error" in artifact["name"]: CURR_REQS += 1 filen = download_artifact(task_id, artifact, downloads_dir) CURR_REQS -= 1 if os.stat(filen).st_size != 0: failed = artifact["name"] if failed: log("Skipping a failed test: " + failed) return for artifact in artifacts: if artifact_to_get in artifact["name"]: filen = download_artifact(task_id, artifact, downloads_dir) CURR_REQS -= 1 if artifact_to_get == "grcov" or unzip_artifact: unzip_file(filen, data_dir, test_counter) else: move_file(filen, data_dir, test_counter) taskid_to_file_map[task_id] = os.path.join( data_dir, str(test_counter) ) log("Finished %s for %s" % (task_id, test_name)) CURR_REQS += 1 t = threading.Thread( target=get_artifacts, args=( task_id, downloads_dir, data_dir, unzip_artifact, task_counters[test_name], test_name, artifact_to_get, download_failures, taskid_to_file_map, ), ) t.daemon = True t.start() threads.append(t) while CURR_REQS >= MAX_REQUESTS: time.sleep(1) log("Waiting for requests to finish, currently at %s" % str(CURR_REQS)) CURR_TASK += 1 for t in threads: t.join() with open(os.path.join(output_dir, "taskid_to_file_map.json"), "w") as f: json.dump(taskid_to_file_map, f, indent=4) log("Finished processing.") log( "Stats: %s PASSED, %s FAILED, %s TOTAL" % (str(len(ALL_TASKS) - len(FAILED)), str(len(FAILED)), str(len(ALL_TASKS))) ) if FAILED: log( "Tasks the failed to have their artifact downloaded: %s" % "\n\t".join(FAILED) ) # Return the directory where all the tasks were downloaded to # and split into folders. return output_dir, head_rev def main(): parser = artifact_downloader_parser() args = parser.parse_args() task_group_id = args.task_group_id[0] test_suites = args.test_suites_list artifact_to_get = args.artifact_to_get[0] unzip_artifact = args.unzip_artifact platform = args.platform download_failures = args.download_failures ingest_continue = args.ingest_continue output_dir = args.output[0] if args.output is not None else os.getcwd() task_dir, head_rev = artifact_downloader( task_group_id, output_dir=output_dir, test_suites=test_suites, artifact_to_get=artifact_to_get, unzip_artifact=unzip_artifact, platform=platform, download_failures=download_failures, ingest_continue=ingest_continue, ) return task_dir if __name__ == "__main__": main()