in sync/notify/bugupdate.py [0:0]
def updated_bugs(self, bug_ids):
"""Get a list of all bugs which are associated with wpt results and have had their
resolution changed since the last update time
:param bug_ids: List of candidate bugs to check
"""
rv = []
params = {}
update_date = None
if self.last_update:
update_date = self.last_update.strftime("%Y-%m-%d")
params["chfieldfrom"] = update_date
if bug_ids:
# TODO: this could make the query over-long; we should probably split
# into multiple queries
params["bug_id"] = ",".join(str(item) for item in bug_ids)
search_resp = env.bz.bugzilla.session.get("%s/rest/bug" % bugzilla_url,
params=params)
search_resp.raise_for_status()
search_data = search_resp.json()
if self.last_update:
history_params = {"new_since": update_date}
else:
history_params = {}
for bug in search_data.get("bugs", []):
if (not self.last_update or
from_iso_str(bug["last_change_time"]) > self.last_update):
history_resp = env.bz.bugzilla.session.get(
"{}/rest/bug/{}/history".format(bugzilla_url, bug["id"]),
params=history_params)
history_resp.raise_for_status()
history_data = history_resp.json()
bugs = history_data.get("bugs")
if not bugs:
continue
assert len(bugs) == 1
for entry in bugs[0].get("history", []):
if not self.last_update or from_iso_str(entry["when"]) > self.last_update:
if any(change["field_name"] == "resolution" for change in entry["changes"]):
rv.append(bugsy.Bug(env.bz.bugzilla, **bug))
continue
return rv