in qpid_tests/broker_0_10/query.py [0:0]
def exchange_bound_with_key(self, exchange_name):
session = self.session
#setup: create two queues
session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
session.exchange_bind(exchange=exchange_name, queue="used-queue", binding_key="used-key")
# test detection of any binding to specific queue
response = session.exchange_bound(exchange=exchange_name, queue="used-queue")
self.assert_(not response.exchange_not_found)
self.assert_(not response.queue_not_found)
self.assert_(not response.queue_not_matched)
# test detection of specific binding to any queue
response = session.exchange_bound(exchange=exchange_name, binding_key="used-key")
self.assert_(not response.exchange_not_found)
self.assert_(not response.queue_not_found)
self.assert_(not response.key_not_matched)
# test detection of specific binding to specific queue
response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="used-key")
self.assert_(not response.exchange_not_found)
self.assert_(not response.queue_not_found)
self.assert_(not response.queue_not_matched)
self.assert_(not response.key_not_matched)
# test unmatched queue, unspecified binding
response = session.exchange_bound(exchange=exchange_name, queue="unused-queue")
self.assert_(not response.exchange_not_found)
self.assert_(not response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
# test unspecified queue, unmatched binding
response = session.exchange_bound(exchange=exchange_name, binding_key="unused-key")
self.assert_(not response.exchange_not_found)
self.assert_(not response.queue_not_found)
self.assertEqual(True, response.key_not_matched)
# test matched queue, unmatched binding
response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="unused-key")
self.assert_(not response.exchange_not_found)
self.assert_(not response.queue_not_found)
self.assert_(not response.queue_not_matched)
self.assertEqual(True, response.key_not_matched)
# test unmatched queue, matched binding
response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="used-key")
self.assert_(not response.exchange_not_found)
self.assert_(not response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
self.assert_(not response.key_not_matched)
# test unmatched queue, unmatched binding
response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="unused-key")
self.assert_(not response.exchange_not_found)
self.assert_(not response.queue_not_found)
self.assertEqual(True, response.queue_not_matched)
self.assertEqual(True, response.key_not_matched)
#test exchange not found
self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
#test exchange found, queue not found
response = session.exchange_bound(exchange=exchange_name, queue="unknown-queue")
self.assertEqual(False, response.exchange_not_found)
self.assertEqual(True, response.queue_not_found)
#test exchange not found, queue found
response = session.exchange_bound(exchange="unknown-exchange", queue="used-queue")
self.assertEqual(True, response.exchange_not_found)
self.assertEqual(False, response.queue_not_found)
#test not exchange found, queue not found
response = session.exchange_bound(exchange="unknown-exchange", queue="unknown-queue")
self.assertEqual(True, response.exchange_not_found)
self.assertEqual(True, response.queue_not_found)