def test_enqueues_dequeues()

in qpid_tests/broker_0_10/stats.py [0:0]


    def test_enqueues_dequeues(self):
        agent = self.setup_access()
        start_broker = agent.getBroker()

        sess = self.setup_session()
        tx = sess.sender("enqueue_test;{create:always,delete:always}")
        rx = sess.receiver("enqueue_test")

        queue = agent.getQueue("enqueue_test")
        self.failUnless(queue, "expected a valid queue object")
        self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues")
        self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues")
        self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues")
        self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues")
        self.assertEqual(queue.msgDepth, 0, "msgDepth")
        self.assertEqual(queue.byteDepth, 0, "byteDepth")

        tx.send("0123456789")
        tx.send("01234567890123456789")
        tx.send("012345678901234567890123456789")
        tx.send("0123456789012345678901234567890123456789")
        overhead = 38 #overhead added to message from headers

        queue.update()
        self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
        self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues")
        self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues")
        self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues")
        self.assertEqual(queue.msgDepth, 4, "msgDepth")
        self.assertEqual(queue.byteDepth, 100+(4*overhead), "byteDepth")

        now_broker = agent.getBroker()
        self.failUnless((now_broker.msgTotalEnqueues - start_broker.msgTotalEnqueues) >= 4, "broker msgTotalEnqueues")
        self.failUnless((now_broker.byteTotalEnqueues - start_broker.byteTotalEnqueues) >= 100, "broker byteTotalEnqueues")

        m = rx.fetch()
        m = rx.fetch()
        sess.acknowledge()

        queue.update()
        self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
        self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues")
        self.assertEqual(queue.msgTotalDequeues, 2, "msgTotalDequeues")
        self.assertEqual(queue.byteTotalDequeues, 30+(2*overhead), "byteTotalDequeues")
        self.assertEqual(queue.msgDepth, 2, "msgDepth")
        self.assertEqual(queue.byteDepth, 70+(2*overhead), "byteDepth")

        now_broker = agent.getBroker()
        self.failUnless((now_broker.msgTotalDequeues - start_broker.msgTotalDequeues) >= 2, "broker msgTotalDequeues")
        self.failUnless((now_broker.byteTotalDequeues - start_broker.byteTotalDequeues) >= 30, "broker byteTotalDequeues")
        
        sess.close()

        now_broker = agent.getBroker()
        self.assertEqual(now_broker.abandoned - start_broker.abandoned, 2, "expect 2 abandoned messages")
        self.assertEqual(now_broker.msgDepth, start_broker.msgDepth, "expect broker message depth to be unchanged")
        self.assertEqual(now_broker.byteDepth, start_broker.byteDepth, "expect broker byte depth to be unchanged")