in qpid_tests/broker_0_10/management.py [0:0]
def test_reroute_queue(self):
"""
Test ability to reroute messages from the head of a queue.
Need to test moving all, 1 (top message) and N messages.
"""
self.startQmf()
session = self.session
"Set up test queue"
session.exchange_declare(exchange="alt.direct1", type="direct")
session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True)
session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key")
session.exchange_declare(exchange="alt.direct2", type="direct")
session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True)
session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key")
session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1")
session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key")
twenty = range(1,21)
props = session.delivery_properties(routing_key="routing_key")
mp = session.message_properties(application_headers={'x-qpid.trace' : 'A,B,C'})
for count in twenty:
body = "Reroute Message %d" % count
msg = Message(props, mp, body)
session.message_transfer(destination="amq.direct", message=msg)
pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
"Reroute top message from reroute-queue to alternate exchange"
result = pq.reroute(1, True, "", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
self.assertEqual(pq.msgDepth,19)
self.assertEqual(aq.msgDepth,1)
"Verify that the trace was cleared on the rerouted message"
url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port or 5672)
conn = qpid.messaging.Connection(url)
conn.open()
sess = conn.session()
rx = sess.receiver("alt-queue1;{mode:browse}")
rm = rx.fetch(1)
self.assertEqual(rm.properties['x-qpid.trace'], '')
conn.close()
"Reroute top 9 messages from reroute-queue to alt.direct2"
result = pq.reroute(9, False, "alt.direct2", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
self.assertEqual(pq.msgDepth,10)
self.assertEqual(aq.msgDepth,9)
"Reroute using a non-existent exchange"
result = pq.reroute(0, False, "amq.nosuchexchange", {})
self.assertEqual(result.status, 4)
"Reroute all messages from reroute-queue"
result = pq.reroute(0, False, "alt.direct2", {})
self.assertEqual(result.status, 0)
pq.update()
aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
self.assertEqual(pq.msgDepth,0)
self.assertEqual(aq.msgDepth,19)
"Make more messages"
twenty = range(1,21)
props = session.delivery_properties(routing_key="routing_key")
for count in twenty:
body = "Reroute Message %d" % count
msg = Message(props, body)
session.message_transfer(destination="amq.direct", message=msg)
"Reroute onto the same queue"
result = pq.reroute(0, False, "amq.direct", {})
self.assertEqual(result.status, 0)
pq.update()
self.assertEqual(pq.msgDepth,20)