in bot/code_coverage_bot/trigger_missing.py [0:0]
def trigger_missing(server_address: str, out_dir: str = ".") -> None:
triggered_revisions_path = os.path.join(out_dir, "triggered_revisions.zst")
url = f"https://firefox-ci-tc.services.mozilla.com/api/index/v1/task/project.relman.code-coverage.{secrets[secrets.APP_CHANNEL]}.crontrigger.latest/artifacts/public/triggered_revisions.zst"
r = requests.head(url, allow_redirects=True)
if r.status_code != 404:
utils.download_file(url, triggered_revisions_path)
try:
dctx = zstandard.ZstdDecompressor()
with open(triggered_revisions_path, "rb") as zf:
with dctx.stream_reader(zf) as reader:
with io.TextIOWrapper(reader, encoding="ascii") as f:
triggered_revisions = set(rev for rev in f.read().splitlines())
except FileNotFoundError:
triggered_revisions = set()
# Get all mozilla-central revisions from the past year.
days = 365 if secrets[secrets.APP_CHANNEL] == "production" else 30
a_year_ago = datetime.utcnow() - timedelta(days=days)
with hgmo.HGMO(server_address=server_address) as hgmo_server:
data = hgmo_server.get_pushes(
startDate=a_year_ago.strftime("%Y-%m-%d"), full=False, tipsonly=True
)
revisions = [
(push_data["changesets"][0], int(push_data["date"]))
for push_data in data["pushes"].values()
]
logger.info(f"{len(revisions)} pushes in the past year")
assert (
secrets[secrets.GOOGLE_CLOUD_STORAGE] is not None
), "Missing GOOGLE_CLOUD_STORAGE secret"
bucket = get_bucket(secrets[secrets.GOOGLE_CLOUD_STORAGE])
missing_revisions = []
for revision, timestamp in revisions:
# Skip revisions that have already been triggered. If they are still missing,
# it means there is a problem that is preventing us from ingesting them.
if revision in triggered_revisions:
continue
# If the revision was already ingested, we don't need to trigger ingestion for it again.
if uploader.gcp_covdir_exists(
bucket, "mozilla-central", revision, "all", "all"
):
triggered_revisions.add(revision)
continue
missing_revisions.append((revision, timestamp))
logger.info(f"{len(missing_revisions)} missing pushes in the past year")
yesterday = int(datetime.timestamp(datetime.utcnow() - timedelta(days=1)))
task_group_id = slugId()
logger.info(f"Triggering tasks in the {task_group_id} group")
triggered = 0
for revision, timestamp in reversed(missing_revisions):
# If it's older than yesterday, we assume the group finished.
# If it is newer than yesterday, we load the group and check if all tasks in it finished.
if timestamp > yesterday:
decision_task_id = taskcluster.get_decision_task(
"mozilla-central", revision
)
if decision_task_id is None:
continue
group = taskcluster.get_task_details(decision_task_id)["taskGroupId"]
if not all(
task["status"]["state"] in taskcluster.FINISHED_STATUSES
for task in taskcluster.get_tasks_in_group(group)
if taskcluster.is_coverage_task(task["task"])
):
continue
trigger_task(task_group_id, revision)
triggered_revisions.add(revision)
triggered += 1
if triggered == MAXIMUM_TRIGGERS:
break
cctx = zstandard.ZstdCompressor(threads=-1)
with open(triggered_revisions_path, "wb") as zf:
with cctx.stream_writer(zf) as compressor:
with io.TextIOWrapper(compressor, encoding="ascii") as f:
f.write("\n".join(triggered_revisions))