in backend/code_review_backend/issues/management/commands/load_issues.py [0:0]
def load_tasks(self, environment, chunk=200):
# Direct unauthenticated usage
index = taskcluster.Index(
{"rootUrl": "https://firefox-ci-tc.services.mozilla.com/"}
)
queue = taskcluster.Queue(
{"rootUrl": "https://firefox-ci-tc.services.mozilla.com/"}
)
token = None
while True:
query = {"limit": chunk}
if token is not None:
query["continuationToken"] = token
data = index.listTasks(
INDEX_PATH.format(environment=environment), query=query
)
for task in data["tasks"]:
if not task["data"].get("issues"):
continue
# Lookup artifact in cache
path = os.path.join(self.cache_dir, task["taskId"])
if os.path.exists(path):
artifact = json.load(open(path))
else:
# Download the task report
logging.info(f"Download task {task['taskId']}")
try:
url = queue.buildUrl(
"getLatestArtifact",
task["taskId"],
"public/results/report.json",
)
# Allows HTTP_30x redirections retrieving the artifact
response = queue.session.get(
url, stream=True, allow_redirects=True
)
response.raise_for_status()
artifact = response.json()
except HTTPError as e:
if (
getattr(getattr(e, "response", None), "status_code", None)
== 404
):
logging.info(f"Missing artifact : {repr(e)}")
continue
raise e
# Check the artifact has repositories & revision
revision = artifact["revision"]
assert "repository" in revision, "Missing repository"
assert "target_repository" in revision, "Missing target_repository"
assert (
"mercurial_revision" in revision
), "Missing mercurial_revision"
# Store artifact in cache
with open(path, "w") as f:
json.dump(artifact, f, sort_keys=True, indent=4)
yield task["taskId"], artifact
token = data.get("continuationToken")
if token is None:
break