in qpid_tests/broker_0_10/management.py [0:0]
def test_move_queued_messages(self):
"""
Test ability to move messages from the head of one queue to another.
Need to test moveing all and N messages.
"""
self.startQmf()
session = self.session
"Set up source queue"
session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True)
session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key")
twenty = range(1,21)
props = session.delivery_properties(routing_key="routing_key")
for count in twenty:
body = "Move Message %d" % count
src_msg = Message(props, body)
session.message_transfer(destination="amq.direct", message=src_msg)
"Set up destination queue"
session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
session.exchange_bind(queue="dest-queue", exchange="amq.direct")
queues = self.qmf.getObjects(_class="queue")
"Move 10 messages from src-queue to dest-queue"
result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10, {})
self.assertEqual (result.status, 0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,10)
self.assertEqual (dq.msgDepth,10)
"Move all remaining messages to destination"
result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0, {})
self.assertEqual (result.status,0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,0)
self.assertEqual (dq.msgDepth,20)
"Use a bad source queue name"
result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0, {})
self.assertEqual (result.status,4)
"Use a bad destination queue name"
result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0, {})
self.assertEqual (result.status,4)
" Use a large qty (40) to move from dest-queue back to "
" src-queue- should move all "
result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40, {})
self.assertEqual (result.status,0)
sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
self.assertEqual (sq.msgDepth,20)
self.assertEqual (dq.msgDepth,0)
"Consume the messages of the queue and check they are all there in order"
session.message_subscribe(queue="src-queue", destination="tag")
session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFF)
session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue = session.incoming("tag")
for count in twenty:
consumed_msg = queue.get(timeout=1)
body = "Move Message %d" % count
self.assertEqual(body, consumed_msg.body)