utils/taskcluster_downloader.py (189 lines of code) (raw):
"""
Downloads artifacts from a Taskcluster Task Group. This command supports the following modes:
- logs
- evals
- model
"""
import os
import sys
from typing import Any
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
# Ensure the pipeline is available on the path.
sys.path.append(os.path.join(CURRENT_DIR, ".."))
import argparse
import csv
import enum
import os
import re
import requests
import taskcluster
from pipeline.common.downloads import stream_download_to_file
from taskcluster.download import downloadArtifactToBuf
TC_MOZILLA = "https://firefox-ci-tc.services.mozilla.com"
DATA_DIR = os.path.abspath(os.path.join(CURRENT_DIR, "../data"))
# to parse evaluation task tag
# examples:
# evaluate-teacher-flores-flores_aug-title_devtest-lt-en-1_2
# evaluate-teacher-mtdata-mtdata_aug-mix_Neulab-tedtalks_test-1-eng-lit-lt-en-1_2
# evaluate-finetuned-student-sacrebleu-wmt19-lt-en
eval_regex = re.compile(
r"evaluate-"
r"(?P<model>finetuned-student|teacher-ensemble|teacher|student|quantized|backward)"
r"-"
r"(?P<importer>flores|mtdata|sacrebleu)"
r"(?P<extra_importer>-flores|-mtdata|-sacrebleu)?"
r"[_-]"
r"(?P<aug>aug-[^_]+)?"
r"_?"
r"(?P<dataset>[-\w_]*?)"
r"-"
r"(?P<extra_lang>([a-z]{3}-[a-z]{3})?"
r"-?"
r"(?P<lang>[a-z]{2}-[a-z]{2}))"
r"-?"
r"(?P<suffix>[\d_]+)?"
)
class Mode(enum.Enum):
logs = "logs"
evals = "evals"
model = "model"
def download_logs(group_id, output):
options = {"rootUrl": TC_MOZILLA}
queue = taskcluster.Queue(options=options)
group: Any = queue.listTaskGroup(group_id)
task_found = False
for task in group["tasks"]:
if task["status"]["state"] not in ("completed", "running"):
continue
label = task["task"]["tags"]["kind"]
if ("train" not in label and "finetune" not in label) or "vocab" in label:
continue
task_found = True
task_id = task["status"]["taskId"]
task_obj: Any = queue.task(task_id)
task["status"]["runs"][-1]["runId"]
task_obj_label = task_obj["tags"]["label"].replace("/", "_")
os.makedirs(output, exist_ok=True)
output_path = os.path.join(output, f"{task_obj_label}.log")
url = queue.buildUrl("getLatestArtifact", task_id, "public/logs/live.log")
resp = requests.get(url, stream=True, timeout=5)
print(f"Downloading {url}")
log_lines = []
start_writing = False
try:
for line in resp.iter_lines():
line_str = line.decode()
if "[marian]" in line_str:
start_writing = True
if start_writing:
log_lines.append(re.sub(r"\[task .*Z\] ", "", line_str))
except requests.exceptions.ConnectionError:
pass
print(f"Writing to {output_path}")
with open(output_path, "w") as f:
f.write("\n".join(log_lines))
if not task_found:
print(f"No logs were found for {group_id}")
def donwload_evals(group_id, output):
options = {"rootUrl": ("%s" % TC_MOZILLA)}
queue = taskcluster.Queue(options=options)
group: Any = queue.listTaskGroup(group_id)
results = []
for task in group["tasks"]:
if task["status"]["state"] != "completed":
continue
label = task["task"]["tags"]["kind"]
if "evaluate" not in label:
continue
task_id = task["status"]["taskId"]
task_obj: Any = queue.task(task_id)
task["status"]["runs"][-1]["runId"]
task_obj_label = task_obj["tags"]["label"].replace("/", "_")
artifacts_response: Any = queue.listLatestArtifacts(task_id)
artifacts = artifacts_response["artifacts"]
artifact_name = [
artifact["name"] for artifact in artifacts if artifact["name"].endswith(".metrics")
][0]
print(f"Downloading {artifact_name} for {task_obj_label}")
content, _ = downloadArtifactToBuf(
taskId=task["status"]["taskId"],
name=artifact_name,
queueService=queue,
)
bleu, chrf, _ = content.tobytes().decode().split("\n")
match = eval_regex.match(task_obj_label)
if not match:
print(f"Cannot match {task_obj_label}")
raise ValueError(f"Cannot match {task_obj_label}")
groups = match.groupdict()
model = groups["model"]
importer = groups["importer"]
augmentation = groups.get("aug", "") or ""
dataset = groups["dataset"]
groups["lang"]
suffix = groups.get("suffix", "") or ""
result = (model + suffix, f"{importer}_{dataset}", augmentation, bleu, chrf)
print(f"Result: {result}")
results.append(result)
os.makedirs(output, exist_ok=True)
output_path = os.path.join(output, f"{group_id}-metrics.csv")
with open(output_path, "w") as csvfile:
csv_writer = csv.writer(csvfile)
csv_writer.writerow(["Model", "Dataset", "Augmentation", "BLEU", "chrF"])
for res in results:
csv_writer.writerow(res)
def download_model(group_id: str, output: str):
options = {"rootUrl": ("%s" % TC_MOZILLA)}
queue = taskcluster.Queue(options=options)
group: Any = queue.listTaskGroup(group_id)
for task in group["tasks"]:
if task["status"]["state"] != "completed":
continue
if task["task"]["tags"]["kind"] != "export":
continue
task_id = task["status"]["taskId"]
task_name = task["task"]["metadata"]["name"]
language_pair = task_name.replace("export-", "")
artifacts_response: Any = queue.listLatestArtifacts(task_id)
artifacts = artifacts_response["artifacts"]
model_artifacts = [
artifact for artifact in artifacts if not artifact["name"].endswith(".log")
]
model_path = os.path.join(output, language_pair)
os.makedirs(model_path, exist_ok=True)
print(
f'Downloading models from "{task_name}": '
f"https://firefox-ci-tc.services.mozilla.com/tasks/{task_id}"
)
for artifact in model_artifacts:
url = queue.buildUrl("getLatestArtifact", task_id, artifact["name"])
path = os.path.join(model_path, os.path.basename(artifact["name"]))
stream_download_to_file(url, path)
print(f"Model files are available at: {model_path}")
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--output",
metavar="OUTPUT",
type=str,
help="Output directory to save logs. Defaults to the data directory.",
)
parser.add_argument(
"--task-group-id",
metavar="TASK_GROUP_ID",
required=True,
type=str,
help="ID of a Taskcluster task group",
)
parser.add_argument(
"--mode",
metavar="MODE",
type=Mode,
choices=Mode,
required=True,
help="What to download: " + ", ".join([m.name for m in Mode]),
)
args = parser.parse_args()
group_id: str = args.task_group_id
mode: Mode = args.mode
output: str
if args.output:
output = args.output
else:
output = os.path.join(DATA_DIR, f"taskcluster-{mode.value}")
if mode == Mode.logs:
download_logs(group_id, output)
elif mode == Mode.evals:
donwload_evals(group_id, output)
elif mode == Mode.model:
download_model(group_id, output)
if __name__ == "__main__":
main()