def run_pulse_listener()

in sync/listen.py [0:0]


def run_pulse_listener(config: Dict[str, Any]) -> None:
    """
    Configures Pulse connection and triggers events from Pulse messages.

    Connection details are managed at https://pulseguardian.mozilla.org/.
    """
    exchanges = []
    queues = {}
    for queue_name, queue_props in config['pulse'].items():
        if (isinstance(queue_props, dict) and
            set(queue_props.keys()) == {"queue", "exchange", "routing_key"}):
            queues[queue_name] = queue_props

    for queue in queues.values():
        logger.info("Connecting to pulse queue:%(queue)s exchange:%(exchange)s"
                    " route:%(routing_key)s" % queue)
        exchanges.append((queue['queue'],
                          queue['exchange'],
                          queue['routing_key']))

    conn = kombu.Connection(hostname=config['pulse']['host'],
                            port=config['pulse']['port'],
                            ssl=config['pulse']['ssl'],
                            userid=config['pulse']['username'],
                            password=config['pulse']['password'])

    listen_logger = get_listen_logger(config)

    filter_map = {
        'github': GitHubFilter,
        'hgmo': PushFilter,
        'taskcluster-taskgroup': TaskGroupFilter,
        'taskcluster-try-completed': DecisionTaskFilter,
        'taskcluster-try-failed': DecisionTaskFilter,
        'taskcluster-try-exception': DecisionTaskFilter,
        'taskcluster-wptsync-completed': TryTaskFilter,
        'taskcluster-wptsync-failed': TryTaskFilter,
        'taskcluster-wptsync-exception': TryTaskFilter,
    }

    with conn:
        try:
            listener = get_listener(conn,
                                    userid=config['pulse']['username'],
                                    exchanges=exchanges,
                                    logger=listen_logger)
            for queue_name, queue in queues.items():
                queue_filter = filter_map[queue_name](config, listen_logger)
                listener.add_callback(queue['exchange'], queue_filter)

            listener.run()
        except KeyboardInterrupt:
            pass
    return None