def test_move_queued_messages()

in qpid_tests/broker_0_10/management.py [0:0]


    def test_move_queued_messages(self):
        """
        Test ability to move messages from the head of one queue to another.
        Need to test moveing all and N messages.
        """
        self.startQmf()
        session = self.session
        "Set up source queue"
        session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True)
        session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key")

        twenty = range(1,21)
        props = session.delivery_properties(routing_key="routing_key")
        for count in twenty:
            body = "Move Message %d" % count
            src_msg = Message(props, body)
            session.message_transfer(destination="amq.direct", message=src_msg)

        "Set up destination queue"
        session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
        session.exchange_bind(queue="dest-queue", exchange="amq.direct")

        queues = self.qmf.getObjects(_class="queue")

        "Move 10 messages from src-queue to dest-queue"
        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10, {})
        self.assertEqual (result.status, 0) 

        sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
        dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]

        self.assertEqual (sq.msgDepth,10)
        self.assertEqual (dq.msgDepth,10)

        "Move all remaining messages to destination"
        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0, {})
        self.assertEqual (result.status,0)

        sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
        dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]

        self.assertEqual (sq.msgDepth,0)
        self.assertEqual (dq.msgDepth,20)

        "Use a bad source queue name"
        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0, {})
        self.assertEqual (result.status,4)

        "Use a bad destination queue name"
        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0, {})
        self.assertEqual (result.status,4)

        " Use a large qty (40) to move from dest-queue back to "
        " src-queue- should move all "
        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40, {})
        self.assertEqual (result.status,0)

        sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
        dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]

        self.assertEqual (sq.msgDepth,20)
        self.assertEqual (dq.msgDepth,0)

        "Consume the messages of the queue and check they are all there in order"
        session.message_subscribe(queue="src-queue", destination="tag")
        session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFF)
        session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFF)
        queue = session.incoming("tag")
        for count in twenty:
            consumed_msg = queue.get(timeout=1)
            body = "Move Message %d" % count
            self.assertEqual(body, consumed_msg.body)