def exchange_bound_with_key()

in qpid_tests/broker_0_10/query.py [0:0]


    def exchange_bound_with_key(self, exchange_name):
        session = self.session
        #setup: create two queues
        session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
        session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
        
        session.exchange_bind(exchange=exchange_name, queue="used-queue", binding_key="used-key")

        # test detection of any binding to specific queue
        response = session.exchange_bound(exchange=exchange_name, queue="used-queue")
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assert_(not response.queue_not_matched)        

        # test detection of specific binding to any queue
        response = session.exchange_bound(exchange=exchange_name, binding_key="used-key")
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assert_(not response.key_not_matched)        

        # test detection of specific binding to specific queue
        response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="used-key")
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assert_(not response.queue_not_matched)        
        self.assert_(not response.key_not_matched)        

        # test unmatched queue, unspecified binding
        response = session.exchange_bound(exchange=exchange_name, queue="unused-queue")
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assertEqual(True, response.queue_not_matched)        

        # test unspecified queue, unmatched binding
        response = session.exchange_bound(exchange=exchange_name, binding_key="unused-key")
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assertEqual(True, response.key_not_matched)        

        # test matched queue, unmatched binding
        response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="unused-key")
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assert_(not response.queue_not_matched)        
        self.assertEqual(True, response.key_not_matched)        

        # test unmatched queue, matched binding
        response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="used-key")
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assertEqual(True, response.queue_not_matched)        
        self.assert_(not response.key_not_matched)        

        # test unmatched queue, unmatched binding
        response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="unused-key")
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assertEqual(True, response.queue_not_matched)        
        self.assertEqual(True, response.key_not_matched)        

        #test exchange not found
        self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)

        #test exchange found, queue not found
        response = session.exchange_bound(exchange=exchange_name, queue="unknown-queue")
        self.assertEqual(False, response.exchange_not_found)
        self.assertEqual(True, response.queue_not_found)

        #test exchange not found, queue found
        response = session.exchange_bound(exchange="unknown-exchange", queue="used-queue")
        self.assertEqual(True, response.exchange_not_found)
        self.assertEqual(False, response.queue_not_found)

        #test not exchange found, queue not found
        response = session.exchange_bound(exchange="unknown-exchange", queue="unknown-queue")
        self.assertEqual(True, response.exchange_not_found)
        self.assertEqual(True, response.queue_not_found)