def test_ack()

in qpid_tests/broker_0_10/message.py [0:0]


    def test_ack(self):
        """
        Test basic ack/recover behaviour using a combination of implicit and
        explicit accept subscriptions.
        """
        self.startQmf()
        session1 = self.conn.session("alternate-session", timeout=10)
        session1.queue_declare(queue="test-ack-queue", auto_delete=True)

        delivery_properties = session1.delivery_properties(routing_key="test-ack-queue")
        for i in ["One", "Two", "Three", "Four", "Five"]:
            session1.message_transfer(message=Message(delivery_properties, i))

        # verify enqueued message count, use both QMF and session query to verify consistency
        self.assertEqual(5, session1.queue_query(queue="test-ack-queue").message_count)
        queueObj = self.qmf.getObjects(_class="queue", name="test-ack-queue")[0]
        self.assertEquals(queueObj.msgDepth, 5)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 0)

        # subscribe with implied acquire, explicit accept:
        session1.message_subscribe(queue = "test-ack-queue", destination = "consumer")
        session1.message_flow(destination="consumer", unit=session1.credit_unit.message, value=0xFFFFFFFF)
        session1.message_flow(destination="consumer", unit=session1.credit_unit.byte, value=0xFFFFFFFF)
        queue = session1.incoming("consumer")

        msg1 = queue.get(timeout=1)
        msg2 = queue.get(timeout=1)
        msg3 = queue.get(timeout=1)
        msg4 = queue.get(timeout=1)
        msg5 = queue.get(timeout=1)

        self.assertEqual("One", msg1.body)
        self.assertEqual("Two", msg2.body)
        self.assertEqual("Three", msg3.body)
        self.assertEqual("Four", msg4.body)
        self.assertEqual("Five", msg5.body)

        # messages should not be on the queue:
        self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
        # QMF shows the dequeues as not having happened yet, since they are have
        # not been accepted
        queueObj.update()
        self.assertEquals(queueObj.msgDepth, 5)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 0)

        session1.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four

        # QMF should now reflect the accepted messages as being dequeued
        self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
        queueObj.update()
        self.assertEquals(queueObj.msgDepth, 2)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 3)

        #subscribe from second session here to ensure queue is not auto-deleted
        #when alternate session closes.  Use implicit accept mode to test that
        #we don't need to explicitly accept
        session2 = self.conn.session("alternate-session-2", timeout=10)
        session2.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)

        #now close the first session, and see that the unaccepted messages are
        #then redelivered to another subscriber:
        session1.close(timeout=10)

        # check the statistics - the queue_query will show the non-accepted
        # messages have been released. QMF never considered them dequeued, so
        # those counts won't change
        self.assertEqual(2, session2.queue_query(queue="test-ack-queue").message_count)
        queueObj.update()
        self.assertEquals(queueObj.msgDepth, 2)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 3)

        session2.message_flow(destination="checker", unit=session2.credit_unit.message, value=0xFFFFFFFF)
        session2.message_flow(destination="checker", unit=session2.credit_unit.byte, value=0xFFFFFFFF)
        queue = session2.incoming("checker")

        msg3b = queue.get(timeout=1)
        msg5b = queue.get(timeout=1)

        self.assertEqual("Three", msg3b.body)
        self.assertEqual("Five", msg5b.body)

        try:
            extra = queue.get(timeout=1)
            self.fail("Got unexpected message: " + extra.body)
        except Empty: None

        self.assertEqual(0, session2.queue_query(queue="test-ack-queue").message_count)
        queueObj.update()
        self.assertEquals(queueObj.msgDepth, 0)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 5)

        # Subscribe one last time to keep the queue available, and to verify
        # that the implied accept worked by verifying no messages have been
        # returned when session2 is closed.
        self.session.message_subscribe(queue = "test-ack-queue", destination = "final-checker")

        session2.close(timeout=10)

        # check the statistics - they should not have changed
        self.assertEqual(0, self.session.queue_query(queue="test-ack-queue").message_count)
        queueObj.update()
        self.assertEquals(queueObj.msgDepth, 0)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 5)

        self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.message, value=0xFFFFFFFF)
        self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
        try:
            extra = self.session.incoming("final-checker").get(timeout=1)
            self.fail("Got unexpected message: " + extra.body)
        except Empty: None