sync/listen.py (175 lines of code) (raw):
import abc
import json
import logging
import os
import urllib.parse
import kombu
from kombu.mixins import ConsumerMixin
from . import log
from . import handlers
from . import tasks
from typing import Any, Dict, Optional
here = os.path.dirname(__file__)
logger = log.get_logger(__name__)
def get_listen_logger(config):
logger = logging.getLogger(__name__)
log_dir = os.path.join(config["root"],
config["paths"]["logs"])
if not os.path.exists(log_dir):
os.makedirs(log_dir)
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
basic_formatter = logging.Formatter('[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
file_handler = logging.handlers.TimedRotatingFileHandler(os.path.join(log_dir, "listen.log"),
when="D", utc=True)
file_handler.setFormatter(basic_formatter)
logger.addHandler(file_handler)
logger.propagate = False
return logger
class Listener(ConsumerMixin):
"""Manages a single kombu.Consumer."""
def __init__(self, conn, exchanges, queues, logger):
self.connection = conn
self._callbacks = {item: [] for item in exchanges}
self._queues = queues
self.connect_max_retries = 10
self.logger = logger
def get_consumers(self, Consumer, channel):
consumer = Consumer(self._queues, callbacks=[self.on_message], auto_declare=False)
return [consumer]
def on_connection_revived(self):
logger.debug("Connection to %s revived." % self.connection.hostname)
def add_callback(self, exchange, func):
if exchange is None:
raise ValueError("Expected string, got None")
self._callbacks[exchange].append(func)
def on_message(self, body, message):
exchange = message.delivery_info['exchange']
callbacks = self._callbacks.get(exchange)
try:
if callbacks:
for cb in callbacks:
cb(body)
else:
raise Exception('received message from unknown exchange: %s' %
exchange)
finally:
message.ack()
def get_listener(conn, userid, exchanges=None, extra_data=None, logger=None):
"""Obtain a Pulse consumer that can handle received messages.
Returns a ``Listener`` instance bound to listen to the requested exchanges.
Callers should use ``add_callback`` to register functions
that will be called when a message is received.
The callback functions receive one argument ``body``, the decoded message body.
"""
queues = []
if exchanges is None:
raise ValueError("No exchanges supplied")
for queue_name, exchange_name, key_name in exchanges:
queue_name = f'queue/{userid}/{queue_name}'
exchange = kombu.Exchange(exchange_name, type='topic',
channel=conn)
exchange.declare(passive=True)
queue = kombu.Queue(name=queue_name,
exchange=exchange,
durable=True,
routing_key=key_name,
exclusive=False,
auto_delete=False,
channel=conn,
extra_data=extra_data)
queues.append(queue)
# queue.declare() declares the exchange, which isn't allowed by the
# server. So call the low-level APIs to only declare the queue itself.
queue.queue_declare()
queue.queue_bind()
return Listener(conn, [item[1] for item in exchanges], queues, logger)
def run_pulse_listener(config: Dict[str, Any]) -> None:
"""
Configures Pulse connection and triggers events from Pulse messages.
Connection details are managed at https://pulseguardian.mozilla.org/.
"""
exchanges = []
queues = {}
for queue_name, queue_props in config['pulse'].items():
if (isinstance(queue_props, dict) and
set(queue_props.keys()) == {"queue", "exchange", "routing_key"}):
queues[queue_name] = queue_props
for queue in queues.values():
logger.info("Connecting to pulse queue:%(queue)s exchange:%(exchange)s"
" route:%(routing_key)s" % queue)
exchanges.append((queue['queue'],
queue['exchange'],
queue['routing_key']))
conn = kombu.Connection(hostname=config['pulse']['host'],
port=config['pulse']['port'],
ssl=config['pulse']['ssl'],
userid=config['pulse']['username'],
password=config['pulse']['password'])
listen_logger = get_listen_logger(config)
filter_map = {
'github': GitHubFilter,
'hgmo': PushFilter,
'taskcluster-taskgroup': TaskGroupFilter,
'taskcluster-try-completed': DecisionTaskFilter,
'taskcluster-try-failed': DecisionTaskFilter,
'taskcluster-try-exception': DecisionTaskFilter,
'taskcluster-wptsync-completed': TryTaskFilter,
'taskcluster-wptsync-failed': TryTaskFilter,
'taskcluster-wptsync-exception': TryTaskFilter,
}
with conn:
try:
listener = get_listener(conn,
userid=config['pulse']['username'],
exchanges=exchanges,
logger=listen_logger)
for queue_name, queue in queues.items():
queue_filter = filter_map[queue_name](config, listen_logger)
listener.add_callback(queue['exchange'], queue_filter)
listener.run()
except KeyboardInterrupt:
pass
return None
class Filter(metaclass=abc.ABCMeta):
name: Optional[str] = None
task = tasks.handle
def __init__(self, config, logger):
self.config = config
self.logger = logger
def __call__(self, body):
if self.accept(body):
self.logger.info("Message accepted %s" % self.name)
self.logger.debug(json.dumps(body))
self.task.apply_async((self.name, body))
def accept(self, body: Dict[str, Any]) -> bool:
raise NotImplementedError
class GitHubFilter(Filter):
name = "github"
event_filters = {item: lambda x: True
for item in handlers.GitHubHandler.dispatch_event.keys()}
event_filters["check_run"] = lambda x: x["payload"]["action"] == "completed"
event_filters["push"] = lambda x: x["payload"]["ref"] == "refs/heads/master"
def __init__(self, config, logger):
super().__init__(config, logger)
repo_path = urllib.parse.urlparse(config["web-platform-tests"]["repo"]["url"]).path
self.key_filter = "%s/" % repo_path.split("/", 2)[1]
def accept(self, body):
return (body["_meta"]["routing_key"].startswith(self.key_filter) and
body["event"] in self.event_filters and
self.event_filters[body["event"]](body))
class PushFilter(Filter):
name = "push"
def __init__(self, config, logger):
super().__init__(config, logger)
self.repos = set(config["gecko"]["repo"].keys())
def accept(self, body):
# Check that this has some commits pushed
if not body["payload"].get("data", {}).get("pushlog_pushes"):
return False
repo = body["_meta"]["routing_key"]
if "/" in repo:
repo = repo.rsplit("/", 1)[1]
return repo in self.repos
class TaskGroupFilter(Filter):
name = "taskgroup"
def accept(self, body):
return body.get("taskGroupId") is not None
class DecisionTaskFilter(Filter):
name = "decision-task"
def accept(self, body):
return is_decision_task(body)
class TryTaskFilter(Filter):
name = "try-task"
def accept(self, body):
return not is_decision_task(body)
def is_decision_task(body):
tags = body.get("task", {}).get("tags", {})
return (tags.get("kind") == "decision-task" and
tags.get("createdForUser") == "wptsync@mozilla.com")