events/code_coverage_events/workflow.py (128 lines of code) (raw):
# -*- coding: utf-8 -*-
import asyncio
import requests
import structlog
from libmozevent.bus import MessageBus
from libmozevent.monitoring import Monitoring
from libmozevent.pulse import PulseListener
from libmozevent.utils import run_tasks
from code_coverage_events import QUEUE_MONITORING
from code_coverage_events import QUEUE_PULSE
from code_coverage_events import taskcluster_config
logger = structlog.get_logger(__name__)
class CodeCoverage(object):
"""
Taskcluster hook handling the code coverage
"""
def __init__(self, hook_id, hook_group_id, bus):
self.triggered_groups = set()
self.group_id = hook_group_id
self.hook_id = hook_id
self.bus = bus
# Setup TC services
self.queue = taskcluster_config.get_service("queue")
self.hooks = taskcluster_config.get_service("hooks")
async def run(self):
"""
Main consumer, running queued payloads from the pulse listener
"""
while True:
# Get next payload from pulse messages
payload = await self.bus.receive(QUEUE_PULSE)
# Parse the payload to extract a new task's environment
envs = await self.parse(payload["body"])
if envs is None:
continue
for env in envs:
# Trigger new tasks
task = self.hooks.triggerHook(self.group_id, self.hook_id, env)
task_id = task["status"]["taskId"]
logger.info("Triggered a new code coverage task", id=task_id)
# Send task to monitoring
await self.bus.send(
QUEUE_MONITORING, (self.group_id, self.hook_id, task_id)
)
def is_coverage_task(self, task):
return "ccov" in task["task"]["metadata"]["name"].split("/")[0].split("-")
async def get_coverage_task_in_group(self, group_id):
if group_id in self.triggered_groups:
logger.info(
"Received duplicated groupResolved notification", group=group_id
)
return None
def maybe_trigger(tasks):
logger.info(
"Checking code coverage tasks", group_id=group_id, nb=len(tasks)
)
for task in tasks:
if self.is_coverage_task(task):
self.triggered_groups.add(group_id)
return task
return None
def load_tasks(limit=200, continuationToken=None):
query = {"limit": limit}
if continuationToken is not None:
query["continuationToken"] = continuationToken
reply = self.queue.listTaskGroup(group_id, query=query)
return maybe_trigger(reply["tasks"]), reply.get("continuationToken")
async def retrieve_coverage_task():
task, token = load_tasks()
while task is None and token is not None:
task, token = load_tasks(continuationToken=token)
# Let other tasks run on long batches
await asyncio.sleep(0)
return task
try:
return await retrieve_coverage_task()
except requests.exceptions.HTTPError:
return None
async def parse(self, body):
"""
Extract revisions from payload
"""
taskGroupId = body["taskGroupId"]
scheduler = body["schedulerId"]
# Check the scheduler name before loading all tasks in the group
# We are only interested in Mozilla gecko builds
if not scheduler.startswith("gecko-level-"):
logger.info(
"Skipping task, unsupported scheduler",
group_id=taskGroupId,
scheduler=scheduler,
)
return None
coverage_task = await self.get_coverage_task_in_group(taskGroupId)
if coverage_task is None:
return None
repository = coverage_task["task"]["payload"]["env"]["GECKO_HEAD_REPOSITORY"]
if repository not in [
"https://hg.mozilla.org/mozilla-central",
"https://hg.mozilla.org/try",
]:
logger.warn(
"Received groupResolved notification for a coverage task in an unexpected branch",
repository=repository,
)
return None
revision = coverage_task["task"]["payload"]["env"]["GECKO_HEAD_REV"]
logger.info(
"Received groupResolved notification for coverage builds",
repository=repository,
revision=revision,
group=taskGroupId,
)
return [{"REPOSITORY": repository, "REVISION": revision}]
class Events(object):
"""
Listen to pulse events and trigger new code coverage tasks
"""
def __init__(self):
# Create message bus shared amongst process
self.bus = MessageBus()
# Build code coverage workflow
self.workflow = CodeCoverage(
taskcluster_config.secrets["hook_id"],
taskcluster_config.secrets["hook_group_id"],
self.bus,
)
# Setup monitoring for newly created tasks
self.monitoring = Monitoring(
taskcluster_config,
QUEUE_MONITORING,
taskcluster_config.secrets["admins"],
7 * 3600,
)
self.monitoring.register(self.bus)
# Create pulse listener for code coverage
self.pulse = PulseListener(
{
QUEUE_PULSE: [
("exchange/taskcluster-queue/v1/task-group-resolved", ["#"])
]
},
taskcluster_config.secrets["pulse_user"],
taskcluster_config.secrets["pulse_password"],
)
self.pulse.register(self.bus)
def run(self):
consumers = [
# Code coverage main workflow
self.workflow.run(),
# Add monitoring task
self.monitoring.run(),
# Add pulse task
self.pulse.run(),
]
# Run all tasks concurrently
run_tasks(consumers)