in sync/downstream.py [0:0]
def next_action(self) -> DownstreamAction:
"""Work out the next action for the sync based on the current status.
Returns a DownstreamAction indicating the next step to take."""
if self.data.get("force-metadata-ready"):
# This is mostly for testing
return DownstreamAction.ready
if self.skip:
return DownstreamAction.ready
if self.error:
if self.tried_to_rebase is False:
return DownstreamAction.try_rebase
return DownstreamAction.manual_fix
latest_try_push = self.latest_valid_try_push
if (latest_try_push and not latest_try_push.taskgroup_id):
if latest_try_push.status == "open":
return DownstreamAction.wait_try
elif latest_try_push.infra_fail:
if self.tried_to_rebase is False:
return DownstreamAction.try_rebase
return DownstreamAction.manual_fix
assert self.pr is not None
pr = env.gh_wpt.get_pull(self.pr)
if pr.merged: # Wait till PR is merged to do anything
if not latest_try_push:
if self.requires_stability_try:
logger.debug("Sync for PR %s requires a stability try push" % self.pr)
return DownstreamAction.try_push_stability
elif self.requires_try:
return DownstreamAction.try_push
else:
return DownstreamAction.ready
if latest_try_push.status != "complete":
return DownstreamAction.wait_try
if self.requires_stability_try and not latest_try_push.stability:
return DownstreamAction.try_push_stability
# If we have infra failure, flag for human intervention. Retrying stability
# runs would be very costly
if latest_try_push.infra_fail:
tasks = latest_try_push.tasks()
if tasks is None:
return DownstreamAction.manual_fix
# Check if we had any successful tests
if tasks.has_completed_tests():
return DownstreamAction.ready
else:
return DownstreamAction.manual_fix
return DownstreamAction.ready
else:
return DownstreamAction.wait_upstream