sync/handlers.py (286 lines of code) (raw):
import newrelic.agent
from . import downstream
from . import log
from . import landing
from . import tc
from . import trypush
from . import update
from . import upstream
from . import worktree
from .env import Environment
from .errors import RetryableError
from .gitutils import pr_for_commit, update_repositories, gecko_repo
from .load import get_pr_sync
from .lock import SyncLock
from .notify import bugupdate
from .repos import cinnabar
from git.repo.base import Repo
from typing import Any, Dict
env = Environment()
logger = log.get_logger(__name__)
class Handler:
def __init__(self, config):
self.config = config
def __call__(self, git_gecko: Repo, git_wpt: Repo, body: Dict[str, Any]) -> None:
raise NotImplementedError
def handle_pr(git_gecko: Repo, git_wpt: Repo, event: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("handle_pr")
pr_id = event["number"]
newrelic.agent.add_custom_parameter("pr", pr_id)
newrelic.agent.add_custom_parameter("action", event["action"])
env.gh_wpt.load_pull(event["pull_request"])
sync = get_pr_sync(git_gecko, git_wpt, pr_id)
repo_update = event.get("_wptsync", {}).get("repo_update", True)
if not sync:
# If we don't know about this sync then it's a new thing that we should
# set up state for
# TODO: maybe want to create a new sync here irrespective of the event
# type because we missed some events.
if event["action"] == "opened":
downstream.new_wpt_pr(git_gecko, git_wpt, event["pull_request"],
repo_update=repo_update)
else:
if isinstance(sync, downstream.DownstreamSync):
update_func = downstream.update_pr
elif isinstance(sync, upstream.UpstreamSync):
update_func = upstream.update_pr
else:
return
merge_sha = (event["pull_request"]["merge_commit_sha"]
if event["pull_request"]["merged"] else None)
merged_by = (event["pull_request"]["merged_by"]["login"] if merge_sha else None)
with SyncLock.for_process(sync.process_name) as lock:
assert isinstance(lock, SyncLock)
with sync.as_mut(lock):
update_func(git_gecko,
git_wpt,
sync,
event["action"],
merge_sha,
event["pull_request"]["base"]["sha"],
merged_by)
def handle_check_run(git_gecko: Repo, git_wpt: Repo, event: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("handle_check_run")
if event["action"] != "completed":
return
check_run = event["check_run"]
newrelic.agent.add_custom_parameter("sha", check_run["head_sha"])
newrelic.agent.add_custom_parameter("name", check_run["name"])
newrelic.agent.add_custom_parameter("status", check_run["status"])
newrelic.agent.add_custom_parameter("conclusion", check_run["conclusion"])
if check_run["name"] not in env.gh_wpt.required_checks("master"):
logger.info("Check %s is not required" % check_run["name"])
return
# First check if the PR is head of any pull request
pr_id = pr_for_commit(git_wpt, check_run["head_sha"])
if pr_id is None:
logger.info("Commit %s is not part of a PR" % pr_id)
return
repo_update = event.get("_wptsync", {}).get("repo_update", True)
if repo_update:
update_repositories(None, git_wpt)
sync = get_pr_sync(git_gecko, git_wpt, pr_id)
if not isinstance(sync, upstream.UpstreamSync):
return
with SyncLock.for_process(sync.process_name) as lock:
assert isinstance(lock, SyncLock)
with sync.as_mut(lock):
upstream.commit_check_changed(git_gecko,
git_wpt,
sync)
def handle_push(git_gecko: Repo, git_wpt: Repo, event: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("handle_push")
update_repositories(None, git_wpt)
landing.wpt_push(git_gecko, git_wpt, [item["id"] for item in event["commits"]])
class GitHubHandler(Handler):
dispatch_event = {
"pull_request": handle_pr,
"check_run": handle_check_run,
"push": handle_push,
}
def __call__(self, git_gecko: Repo, git_wpt: Repo, body: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("GitHubHandler")
handler = self.dispatch_event[body["event"]]
newrelic.agent.add_custom_parameter("event", body["event"])
if handler is not None:
return handler(git_gecko, git_wpt, body["payload"])
# TODO: other events to check if we can merge a PR
# because of some update
class PushHandler(Handler):
def __call__(self, git_gecko: Repo, git_wpt: Repo, body: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("PushHandler")
repo = body["_meta"]["routing_key"]
if "/" in repo:
repo_name = repo.rsplit("/", 1)[1]
else:
repo_name = repo
# Commands can override the base rev and select only certain processses
base_rev = body.get("_wptsync", {}).get("base_rev")
processes = body.get("_wptsync", {}).get("processes")
# Not sure if it's ever possible to get multiple heads here in a way that
# matters for us
rev = body["payload"]["data"]["heads"][0]
logger.info(f"Handling commit {rev} to repo {repo}")
newrelic.agent.add_custom_parameter("repo", repo)
newrelic.agent.add_custom_parameter("rev", rev)
update_repositories(git_gecko, git_wpt, wait_gecko_commit=rev)
try:
git_rev = cinnabar(git_gecko).hg2git(rev)
except ValueError:
pass
else:
if gecko_repo(git_gecko, git_gecko.rev_parse(git_rev)) is None:
logger.info("Skipping commit as it isn't in a branch we track")
return
if processes is None or "upstream" in processes:
upstream.gecko_push(git_gecko, git_wpt, repo_name, rev, base_rev=base_rev)
if processes is None or "landing" in processes:
landing.gecko_push(git_gecko, git_wpt, repo_name, rev, base_rev=base_rev)
class DecisionTaskHandler(Handler):
"""Handler for the task associated with a task completing."""
complete_states = frozenset(["completed", "failed", "exception"])
def __call__(self, git_gecko: Repo, git_wpt: Repo, body: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("DecisionTaskHandler")
task_id = body["status"]["taskId"]
taskgroup_id = body["status"]["taskGroupId"]
msg = "Expected kind decision-task, got %s" % body["task"]["tags"]["kind"]
assert body["task"]["tags"]["kind"] == "decision-task", msg
newrelic.agent.add_custom_parameter("tc_task", task_id)
newrelic.agent.add_custom_parameter("tc_taskgroup", taskgroup_id)
state = body["status"]["state"]
newrelic.agent.add_custom_parameter("state", state)
# Enforce the invariant that the taskgroup id is not set until
# the decision task is complete. This allows us to determine if a
# try push should have the expected wpt tasks just by checking if
# this is set
if state not in self.complete_states:
logger.info("Decision task is not yet complete, status %s" % state)
return
task = tc.get_task(task_id)
if task is None:
raise ValueError("Failed to get task for task_id %s" % task_id)
sha1 = task.get("payload", {}).get("env", {}).get("GECKO_HEAD_REV")
if sha1 is None:
raise ValueError("Failed to get commit sha1 from task message")
if state == "exception":
run_id = body["runId"]
runs = body.get("status", {}).get("runs", [])
if 0 <= run_id < len(runs):
reason = runs[run_id].get("reasonResolved")
if reason in ["superseded",
"claim-expired",
"worker-shutdown",
"intermittent-task"]:
logger.info("Task %s had an exception for reason %s, "
"assuming taskcluster will retry" %
(task_id, reason))
return
try_push = trypush.TryPush.for_commit(git_gecko, sha1)
if not try_push:
logger.debug(f"No try push for SHA1 {sha1} taskId {task_id}")
# This could be a race condition if the decision task completes before this
# task is in the index
raise RetryableError("Got a wptsync task with no corresponding try push")
with SyncLock.for_process(try_push.process_name) as lock:
assert isinstance(lock, SyncLock)
with try_push.as_mut(lock):
# If we retrigger, we create a new taskgroup, with id equal to the new task_id.
# But the retriggered decision task itself is still in the original taskgroup
if state == "completed":
logger.info("Setting taskgroup id for try push %r to %s" %
(try_push, taskgroup_id))
try_push.taskgroup_id = taskgroup_id
elif state in ("failed", "exception"):
sync = try_push.sync(git_gecko, git_wpt)
message = ("Decision task got status %s for task %s%s" %
(state, sha1, " PR %s" % sync.pr if sync and sync.pr else ""))
logger.error(message)
taskgroup = tc.TaskGroup(task["taskGroupId"])
if len(taskgroup.view(
lambda x: x["task"]["metadata"]["name"] == "Gecko Decision Task")) > 5:
try_push.status = "complete"
try_push.infra_fail = True
try_push.taskgroup_id = taskgroup_id
if sync and sync.bug:
env.bz.comment(
sync.bug,
"Try push failed: decision task %s returned error" % task_id)
else:
logger.info("Retriggering decision task for sync %s" %
(sync.process_name,))
client = tc.TaskclusterClient()
client.retrigger(task_id)
class TryTaskHandler(Handler):
"""Handler for the task associated with a try push task completing."""
def __call__(self, git_gecko: Repo, git_wpt: Repo, body: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("TryTaskHandler")
taskgroup_id = body["status"]["taskGroupId"]
newrelic.agent.add_custom_parameter("tc_taskgroup", taskgroup_id)
try_push = trypush.TryPush.for_taskgroup(git_gecko, taskgroup_id)
if not try_push:
logger.debug("No try push for taskgroup %s" % taskgroup_id)
# this is not one of our try_pushes
return
if try_push.status == "complete":
return
logger.info("Found try push for taskgroup %s" % taskgroup_id)
# Check if the taskgroup has all tasks complete, excluding unscheduled tasks.
# This allows us to tell if the taskgroup is complete (per the treeherder definition)
# even when there are tasks that won't be scheduled because the task they depend on
# failed. Otherwise we'd have to wait until those unscheduled tasks time out, which
# usually takes 24hr
if try_push.taskgroup_id is None:
with SyncLock.for_process(try_push.process_name) as lock:
assert isinstance(lock, SyncLock)
with try_push.as_mut(lock):
try_push.taskgroup_id = taskgroup_id
tasks = try_push.tasks()
if tasks.complete(allow_unscheduled=True):
taskgroup_complete(git_gecko, git_wpt, taskgroup_id, try_push)
class TaskGroupHandler(Handler):
def __call__(self, git_gecko: Repo, git_wpt: Repo, body: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("TaskGroupHandler")
taskgroup_id = tc.normalize_task_id(body["taskGroupId"])
newrelic.agent.add_custom_parameter("tc_task", taskgroup_id)
try_push = trypush.TryPush.for_taskgroup(git_gecko, taskgroup_id)
if not try_push:
logger.debug("No try push for taskgroup %s" % taskgroup_id)
# this is not one of our try_pushes
return
logger.info("Found try push for taskgroup %s" % taskgroup_id)
taskgroup_complete(git_gecko, git_wpt, taskgroup_id, try_push)
def taskgroup_complete(git_gecko: Repo, git_wpt: Repo, taskgroup_id: str,
try_push: trypush.TryPush) -> None:
sync = try_push.sync(git_gecko, git_wpt)
if not sync:
newrelic.agent.record_custom_event("taskgroup_sync_missing", params={
"taskgroup-id": taskgroup_id,
"try_push": try_push,
})
return
with SyncLock.for_process(sync.process_name) as lock:
assert isinstance(lock, SyncLock)
with sync.as_mut(lock), try_push.as_mut(lock):
# We sometimes see the taskgroup ID being None. If it isn't set but found via its
# taskgroup ID, it is safe to set it here.
if try_push.taskgroup_id is None:
logger.info("Try push for taskgroup %s does not have its ID set, setting now" %
taskgroup_id)
try_push.taskgroup_id = taskgroup_id # type: ignore
newrelic.agent.record_custom_event("taskgroup_id_missing", params={
"taskgroup-id": taskgroup_id,
"try_push": try_push,
"sync": sync,
})
elif try_push.taskgroup_id != taskgroup_id:
msg = ("TryPush %s, expected taskgroup ID %s, found %s instead" %
(try_push, taskgroup_id, try_push.taskgroup_id))
logger.error(msg)
exc = ValueError(msg)
newrelic.agent.record_exception(exc=exc)
raise exc
if sync:
logger.info("Updating try push for sync %r" % sync)
if isinstance(sync, downstream.DownstreamSync):
downstream.try_push_complete(git_gecko, git_wpt, try_push, sync)
elif isinstance(sync, landing.LandingSync):
landing.try_push_complete(git_gecko, git_wpt, try_push, sync)
class LandingHandler(Handler):
def __call__(self, git_gecko: Repo, git_wpt: Repo, body: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("LandingHandler")
landing.update_landing(git_gecko, git_wpt)
class CleanupHandler(Handler):
def __call__(self, git_gecko: Repo, git_wpt: Repo, body: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("CleanupHandler")
logger.info("Running cleanup")
worktree.cleanup(git_gecko, git_wpt)
tc.cleanup()
class RetriggerHandler(Handler):
def __call__(self, git_gecko: Repo, git_wpt: Repo, body: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("RetriggerHandler")
logger.info("Running retrigger")
update_repositories(git_gecko, git_wpt)
sync_point = landing.load_sync_point(git_gecko, git_wpt)
prev_wpt_head = sync_point["upstream"]
unlanded = landing.unlanded_with_type(git_gecko, git_wpt, None, prev_wpt_head)
update.retrigger(git_gecko, git_wpt, unlanded)
class PhabricatorHandler(Handler):
def __call__(self, git_gecko: Repo, git_wpt: Repo, body: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("PhabricatorHandler")
logger.info('Got phab event, doing nothing: %s' % body)
class BugUpdateHandler(Handler):
def __call__(self, git_gecko: Repo, git_wpt: Repo, body: Dict[str, Any]) -> None:
newrelic.agent.set_transaction_name("BugUpdateHandler")
logger.info("Running bug update")
bugupdate.update_triage_bugs(git_gecko)