def test_binding_count_on_queue()

in qpid_tests/broker_0_10/management.py [0:0]


    def test_binding_count_on_queue(self):
        self.startQmf()
        conn = self.connect()
        session = self.session

        QUEUE = "binding_test_queue"
        EX_DIR = "binding_test_exchange_direct"
        EX_FAN = "binding_test_exchange_fanout"
        EX_TOPIC = "binding_test_exchange_topic"
        EX_HDR = "binding_test_exchange_headers"

        #
        # Create a test queue
        #
        session.queue_declare(queue=QUEUE, exclusive=True, auto_delete=True)
        queue = self.qmf.getObjects(_class="queue", name=QUEUE)[0]
        if not queue:
            self.fail("Queue not found")
        self.assertEqual(queue.bindingCount, 1, "wrong initial binding count")

        #
        # Create an exchange of each supported type
        #
        session.exchange_declare(exchange=EX_DIR, type="direct")
        session.exchange_declare(exchange=EX_FAN, type="fanout")
        session.exchange_declare(exchange=EX_TOPIC, type="topic")
        session.exchange_declare(exchange=EX_HDR, type="headers")

        #
        # Bind each exchange to the test queue
        #
        match = {}
        match['x-match'] = "all"
        match['key'] = "value"
        session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key1")
        session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
        session.exchange_bind(exchange=EX_FAN, queue=QUEUE)
        session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key1.#")
        session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
        session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key1", arguments=match)
        match['key2'] = "value2"
        session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key2", arguments=match)

        #
        # Verify that the queue's binding count accounts for the new bindings
        #
        queue.update()
        self.assertEqual(queue.bindingCount, 8,
                         "added bindings not accounted for (expected 8, got %d)" % queue.bindingCount)

        #
        # Remove some of the bindings
        #
        session.exchange_unbind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
        session.exchange_unbind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
        session.exchange_unbind(exchange=EX_HDR, queue=QUEUE, binding_key="key2")

        #
        # Verify that the queue's binding count accounts for the deleted bindings
        #
        queue.update()
        self.assertEqual(queue.bindingCount, 5,
                         "deleted bindings not accounted for (expected 5, got %d)" % queue.bindingCount)
        #
        # Delete the exchanges
        #
        session.exchange_delete(exchange=EX_DIR)
        session.exchange_delete(exchange=EX_FAN)
        session.exchange_delete(exchange=EX_TOPIC)
        session.exchange_delete(exchange=EX_HDR)

        #
        # Verify that the queue's binding count accounts for the lost bindings
        #
        queue.update()
        self.assertEqual(queue.bindingCount, 1,
                         "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount)