def test_exchange_bound_header()

in qpid_tests/broker_0_10/query.py [0:0]


    def test_exchange_bound_header(self):
        """
        Test that the exchange_bound method works as expected with headers exchanges
        """
        session = self.session
        #setup
        session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
        session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
        session.exchange_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )

        # test detection of any binding to specific queue
        response = session.exchange_bound(exchange="amq.match", queue="used-queue")
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assert_(not response.queue_not_matched)        

        # test detection of specific binding to any queue
        response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assert_(not response.args_not_matched)        

        # test detection of specific binding to specific queue
        response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assert_(not response.queue_not_matched)        
        self.assert_(not response.args_not_matched)        

        # test unmatched queue, unspecified binding
        response = session.exchange_bound(exchange="amq.match", queue="unused-queue")
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assertEqual(True, response.queue_not_matched)        

        # test unspecified queue, unmatched binding
        response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assertEqual(True, response.args_not_matched)        

        # test matched queue, unmatched binding
        response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assert_(not response.queue_not_matched)        
        self.assertEqual(True, response.args_not_matched)        

        # test unmatched queue, matched binding
        response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assertEqual(True, response.queue_not_matched)        
        self.assert_(not response.args_not_matched)        

        # test unmatched queue, unmatched binding
        response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
        self.assert_(not response.exchange_not_found)
        self.assert_(not response.queue_not_found)
        self.assertEqual(True, response.queue_not_matched)        
        self.assertEqual(True, response.args_not_matched)        

        #test exchange not found
        self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)

        #test queue not found
        self.assertEqual(True, session.exchange_bound(exchange="amq.match", queue="unknown-queue").queue_not_found)