in sync/listen.py [0:0]
def run_pulse_listener(config: Dict[str, Any]) -> None:
"""
Configures Pulse connection and triggers events from Pulse messages.
Connection details are managed at https://pulseguardian.mozilla.org/.
"""
exchanges = []
queues = {}
for queue_name, queue_props in config['pulse'].items():
if (isinstance(queue_props, dict) and
set(queue_props.keys()) == {"queue", "exchange", "routing_key"}):
queues[queue_name] = queue_props
for queue in queues.values():
logger.info("Connecting to pulse queue:%(queue)s exchange:%(exchange)s"
" route:%(routing_key)s" % queue)
exchanges.append((queue['queue'],
queue['exchange'],
queue['routing_key']))
conn = kombu.Connection(hostname=config['pulse']['host'],
port=config['pulse']['port'],
ssl=config['pulse']['ssl'],
userid=config['pulse']['username'],
password=config['pulse']['password'])
listen_logger = get_listen_logger(config)
filter_map = {
'github': GitHubFilter,
'hgmo': PushFilter,
'taskcluster-taskgroup': TaskGroupFilter,
'taskcluster-try-completed': DecisionTaskFilter,
'taskcluster-try-failed': DecisionTaskFilter,
'taskcluster-try-exception': DecisionTaskFilter,
'taskcluster-wptsync-completed': TryTaskFilter,
'taskcluster-wptsync-failed': TryTaskFilter,
'taskcluster-wptsync-exception': TryTaskFilter,
}
with conn:
try:
listener = get_listener(conn,
userid=config['pulse']['username'],
exchanges=exchanges,
logger=listen_logger)
for queue_name, queue in queues.items():
queue_filter = filter_map[queue_name](config, listen_logger)
listener.add_callback(queue['exchange'], queue_filter)
listener.run()
except KeyboardInterrupt:
pass
return None