def test_reroute_queue()

in qpid_tests/broker_0_10/management.py [0:0]


    def test_reroute_queue(self):
        """
        Test ability to reroute messages from the head of a queue.
        Need to test moving all, 1 (top message) and N messages.
        """
        self.startQmf()
        session = self.session
        "Set up test queue"
        session.exchange_declare(exchange="alt.direct1", type="direct")
        session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True)
        session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key")
        session.exchange_declare(exchange="alt.direct2", type="direct")
        session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True)
        session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key")
        session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1")
        session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key")

        twenty = range(1,21)
        props = session.delivery_properties(routing_key="routing_key")
        mp    = session.message_properties(application_headers={'x-qpid.trace' : 'A,B,C'})
        for count in twenty:
            body = "Reroute Message %d" % count
            msg = Message(props, mp, body)
            session.message_transfer(destination="amq.direct", message=msg)

        pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]

        "Reroute top message from reroute-queue to alternate exchange"
        result = pq.reroute(1, True, "", {})
        self.assertEqual(result.status, 0) 
        pq.update()
        aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
        self.assertEqual(pq.msgDepth,19)
        self.assertEqual(aq.msgDepth,1)

        "Verify that the trace was cleared on the rerouted message"
        url = "%s://%s:%d" % (self.broker.scheme or "amqp", self.broker.host, self.broker.port or 5672)
        conn = qpid.messaging.Connection(url)
        conn.open()
        sess = conn.session()
        rx = sess.receiver("alt-queue1;{mode:browse}")
        rm = rx.fetch(1)
        self.assertEqual(rm.properties['x-qpid.trace'], '')
        conn.close()

        "Reroute top 9 messages from reroute-queue to alt.direct2"
        result = pq.reroute(9, False, "alt.direct2", {})
        self.assertEqual(result.status, 0) 
        pq.update()
        aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
        self.assertEqual(pq.msgDepth,10)
        self.assertEqual(aq.msgDepth,9)

        "Reroute using a non-existent exchange"
        result = pq.reroute(0, False, "amq.nosuchexchange", {})
        self.assertEqual(result.status, 4)

        "Reroute all messages from reroute-queue"
        result = pq.reroute(0, False, "alt.direct2", {})
        self.assertEqual(result.status, 0) 
        pq.update()
        aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
        self.assertEqual(pq.msgDepth,0)
        self.assertEqual(aq.msgDepth,19)

        "Make more messages"
        twenty = range(1,21)
        props = session.delivery_properties(routing_key="routing_key")
        for count in twenty:
            body = "Reroute Message %d" % count
            msg = Message(props, body)
            session.message_transfer(destination="amq.direct", message=msg)

        "Reroute onto the same queue"
        result = pq.reroute(0, False, "amq.direct", {})
        self.assertEqual(result.status, 0) 
        pq.update()
        self.assertEqual(pq.msgDepth,20)