in mozperftest_tools/mozperftest_tools/lull_scheduler.py [0:0]
def fetch_lull_schedule_tasks(self, task_id):
"""Fetches all the tasks that have a lull-schedule attribute setting.
:param str task_id: A task ID of a decision task that should be used
to gather the full-task-graph.json from.
:return dict: A dictionary containing a mapping of the tasks to their
lull-schedule (in days).
"""
ftg_download_url = FTG_URL.format(task_id)
cached_ftg = pathlib.Path(self.cache_path, f"{task_id}-ftg.json")
if cached_ftg.exists() and self.use_cache:
with cached_ftg.open() as f:
ftg = json.load(f)
else:
print(f"Downloading full-task-graph.json from: {ftg_download_url}")
ftg = fetch_data(ftg_download_url)
if self.use_cache:
with cached_ftg.open("w") as f:
json.dump(ftg, f)
tasks = {}
for task, task_info in ftg.items():
extra = task_info.get("task", {}).get("extra", {})
if "lull-schedule" in extra:
tasks[task] = schedule_to_timedelta(extra["lull-schedule"])
return tasks