in sync/downstream.py [0:0]
def try_notify(self, force: bool = False) -> None:
newrelic.agent.record_custom_event("try_notify", params={
"sync_bug": self.bug,
"sync_pr": self.pr
})
if self.results_notified and not force:
return
if not self.bug:
logger.error("Sync for PR %s has no associated bug" % self.pr)
return
if not self.affected_tests():
logger.debug("PR %s doesn't have affected tests so skipping results notification" %
self.pr)
newrelic.agent.record_custom_event("try_notify_no_affected", params={
"sync_bug": self.bug,
"sync_pr": self.pr
})
return
logger.info("Trying to generate results notification for PR %s" % self.pr)
results = notify.results.for_sync(self)
if not results:
# TODO handle errors here better, perhaps
logger.error("Failed to get results notification for PR %s" % self.pr)
newrelic.agent.record_custom_event("try_notify_failed", params={
"sync_bug": self.bug,
"sync_pr": self.pr
})
return
message, truncated = notify.msg.for_results(results)
with env.bz.bug_ctx(self.bug) as bug:
if truncated:
bug.add_attachment(data=message.encode("utf8"),
file_name="wpt-results.md",
summary="Notable wpt changes",
is_markdown=True,
comment=truncated)
else:
env.bz.comment(self.bug, message, is_markdown=True)
bugs = notify.bugs.for_sync(self, results)
notify.bugs.update_metadata(self, bugs)
self.results_notified = True # type: ignore
with SyncLock.for_process(self.process_name) as lock:
assert isinstance(lock, SyncLock)
for try_push in self.try_pushes():
with try_push.as_mut(lock):
try_push.cleanup_logs()