events/code_coverage_events/workflow.py (128 lines of code) (raw):

# -*- coding: utf-8 -*- import asyncio import requests import structlog from libmozevent.bus import MessageBus from libmozevent.monitoring import Monitoring from libmozevent.pulse import PulseListener from libmozevent.utils import run_tasks from code_coverage_events import QUEUE_MONITORING from code_coverage_events import QUEUE_PULSE from code_coverage_events import taskcluster_config logger = structlog.get_logger(__name__) class CodeCoverage(object): """ Taskcluster hook handling the code coverage """ def __init__(self, hook_id, hook_group_id, bus): self.triggered_groups = set() self.group_id = hook_group_id self.hook_id = hook_id self.bus = bus # Setup TC services self.queue = taskcluster_config.get_service("queue") self.hooks = taskcluster_config.get_service("hooks") async def run(self): """ Main consumer, running queued payloads from the pulse listener """ while True: # Get next payload from pulse messages payload = await self.bus.receive(QUEUE_PULSE) # Parse the payload to extract a new task's environment envs = await self.parse(payload["body"]) if envs is None: continue for env in envs: # Trigger new tasks task = self.hooks.triggerHook(self.group_id, self.hook_id, env) task_id = task["status"]["taskId"] logger.info("Triggered a new code coverage task", id=task_id) # Send task to monitoring await self.bus.send( QUEUE_MONITORING, (self.group_id, self.hook_id, task_id) ) def is_coverage_task(self, task): return "ccov" in task["task"]["metadata"]["name"].split("/")[0].split("-") async def get_coverage_task_in_group(self, group_id): if group_id in self.triggered_groups: logger.info( "Received duplicated groupResolved notification", group=group_id ) return None def maybe_trigger(tasks): logger.info( "Checking code coverage tasks", group_id=group_id, nb=len(tasks) ) for task in tasks: if self.is_coverage_task(task): self.triggered_groups.add(group_id) return task return None def load_tasks(limit=200, continuationToken=None): query = {"limit": limit} if continuationToken is not None: query["continuationToken"] = continuationToken reply = self.queue.listTaskGroup(group_id, query=query) return maybe_trigger(reply["tasks"]), reply.get("continuationToken") async def retrieve_coverage_task(): task, token = load_tasks() while task is None and token is not None: task, token = load_tasks(continuationToken=token) # Let other tasks run on long batches await asyncio.sleep(0) return task try: return await retrieve_coverage_task() except requests.exceptions.HTTPError: return None async def parse(self, body): """ Extract revisions from payload """ taskGroupId = body["taskGroupId"] scheduler = body["schedulerId"] # Check the scheduler name before loading all tasks in the group # We are only interested in Mozilla gecko builds if not scheduler.startswith("gecko-level-"): logger.info( "Skipping task, unsupported scheduler", group_id=taskGroupId, scheduler=scheduler, ) return None coverage_task = await self.get_coverage_task_in_group(taskGroupId) if coverage_task is None: return None repository = coverage_task["task"]["payload"]["env"]["GECKO_HEAD_REPOSITORY"] if repository not in [ "https://hg.mozilla.org/mozilla-central", "https://hg.mozilla.org/try", ]: logger.warn( "Received groupResolved notification for a coverage task in an unexpected branch", repository=repository, ) return None revision = coverage_task["task"]["payload"]["env"]["GECKO_HEAD_REV"] logger.info( "Received groupResolved notification for coverage builds", repository=repository, revision=revision, group=taskGroupId, ) return [{"REPOSITORY": repository, "REVISION": revision}] class Events(object): """ Listen to pulse events and trigger new code coverage tasks """ def __init__(self): # Create message bus shared amongst process self.bus = MessageBus() # Build code coverage workflow self.workflow = CodeCoverage( taskcluster_config.secrets["hook_id"], taskcluster_config.secrets["hook_group_id"], self.bus, ) # Setup monitoring for newly created tasks self.monitoring = Monitoring( taskcluster_config, QUEUE_MONITORING, taskcluster_config.secrets["admins"], 7 * 3600, ) self.monitoring.register(self.bus) # Create pulse listener for code coverage self.pulse = PulseListener( { QUEUE_PULSE: [ ("exchange/taskcluster-queue/v1/task-group-resolved", ["#"]) ] }, taskcluster_config.secrets["pulse_user"], taskcluster_config.secrets["pulse_password"], ) self.pulse.register(self.bus) def run(self): consumers = [ # Code coverage main workflow self.workflow.run(), # Add monitoring task self.monitoring.run(), # Add pulse task self.pulse.run(), ] # Run all tasks concurrently run_tasks(consumers)