in probe_scraper/probe_expiry_alert.py [0:0]
def main(current_date: datetime.date, dryrun: bool, bugzilla_api_key: str):
# Only send create bugs on Wednesdays, run the rest for debugging/error detection
dryrun = dryrun or current_date.weekday() != 2
current_version = str(int(get_latest_nightly_version()))
next_version = str(int(current_version) + 1)
with tempfile.TemporaryDirectory() as tempdir:
events_file_path = os.path.join(tempdir, EVENTS_FILE)
download_file(BASE_URI + EVENTS_FILE, events_file_path)
events = EventsParser().parse([events_file_path])
histograms_file_path = os.path.join(tempdir, HISTOGRAMS_FILE)
download_file(BASE_URI + HISTOGRAMS_FILE, histograms_file_path)
histograms = HistogramsParser().parse(
[histograms_file_path], version=next_version
)
scalars_file_path = os.path.join(tempdir, SCALARS_FILE)
download_file(BASE_URI + SCALARS_FILE, scalars_file_path)
scalars = ScalarsParser().parse([scalars_file_path])
all_probes = events.copy()
all_probes.update(histograms)
all_probes.update(scalars)
expiring_probes = find_expiring_probes(all_probes, next_version, bugzilla_api_key)
print(f"Found {len(expiring_probes)} probes expiring in nightly {next_version}")
print([probe.name for probe in expiring_probes])
# find emails with no bugzilla account
emails_wo_accounts = defaultdict(list)
for probe_name, email_list in [
(probe.name, probe.emails) for probe in expiring_probes
]:
for i, email in enumerate(email_list.copy()):
if email in emails_wo_accounts.keys() or not check_bugzilla_user_exists(
email, bugzilla_api_key
):
emails_wo_accounts[email].append(probe_name)
email_list.remove(email)
probe_to_bug_id = file_bugs(
expiring_probes, current_version, bugzilla_api_key, dryrun=dryrun
)
send_emails(emails_wo_accounts, probe_to_bug_id, current_version, dryrun=dryrun)