in qpid_tests/broker_0_10/management.py [0:0]
def test_binding_count_on_queue(self):
self.startQmf()
conn = self.connect()
session = self.session
QUEUE = "binding_test_queue"
EX_DIR = "binding_test_exchange_direct"
EX_FAN = "binding_test_exchange_fanout"
EX_TOPIC = "binding_test_exchange_topic"
EX_HDR = "binding_test_exchange_headers"
#
# Create a test queue
#
session.queue_declare(queue=QUEUE, exclusive=True, auto_delete=True)
queue = self.qmf.getObjects(_class="queue", name=QUEUE)[0]
if not queue:
self.fail("Queue not found")
self.assertEqual(queue.bindingCount, 1, "wrong initial binding count")
#
# Create an exchange of each supported type
#
session.exchange_declare(exchange=EX_DIR, type="direct")
session.exchange_declare(exchange=EX_FAN, type="fanout")
session.exchange_declare(exchange=EX_TOPIC, type="topic")
session.exchange_declare(exchange=EX_HDR, type="headers")
#
# Bind each exchange to the test queue
#
match = {}
match['x-match'] = "all"
match['key'] = "value"
session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key1")
session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
session.exchange_bind(exchange=EX_FAN, queue=QUEUE)
session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key1.#")
session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key1", arguments=match)
match['key2'] = "value2"
session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key2", arguments=match)
#
# Verify that the queue's binding count accounts for the new bindings
#
queue.update()
self.assertEqual(queue.bindingCount, 8,
"added bindings not accounted for (expected 8, got %d)" % queue.bindingCount)
#
# Remove some of the bindings
#
session.exchange_unbind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
session.exchange_unbind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
session.exchange_unbind(exchange=EX_HDR, queue=QUEUE, binding_key="key2")
#
# Verify that the queue's binding count accounts for the deleted bindings
#
queue.update()
self.assertEqual(queue.bindingCount, 5,
"deleted bindings not accounted for (expected 5, got %d)" % queue.bindingCount)
#
# Delete the exchanges
#
session.exchange_delete(exchange=EX_DIR)
session.exchange_delete(exchange=EX_FAN)
session.exchange_delete(exchange=EX_TOPIC)
session.exchange_delete(exchange=EX_HDR)
#
# Verify that the queue's binding count accounts for the lost bindings
#
queue.update()
self.assertEqual(queue.bindingCount, 1,
"deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount)