in libmozevent/pulse.py [0:0]
def find_matching_queues(self, payload_exchange: str, payload_routes: List[bytes]):
"""
Detect all the bus that match the current routing
"""
def _match(exchange, route):
# Exchanges must match exactly
if payload_exchange != exchange:
return False
# One of the pauload routes must match the current route
route = route.replace("#", "*").encode("utf-8")
return len(fnmatch.filter(payload_routes, route)) > 0
return {
bus_queue
for bus_queue, queue_routes in self.queues_routes.items()
for pulse_exchange, pulse_routes in queue_routes
for pulse_route in pulse_routes
if _match(pulse_exchange, pulse_route)
}