in qpid_tests/broker_0_10/stats.py [0:0]
def test_enqueues_dequeues(self):
agent = self.setup_access()
start_broker = agent.getBroker()
sess = self.setup_session()
tx = sess.sender("enqueue_test;{create:always,delete:always}")
rx = sess.receiver("enqueue_test")
queue = agent.getQueue("enqueue_test")
self.failUnless(queue, "expected a valid queue object")
self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues")
self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues")
self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues")
self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues")
self.assertEqual(queue.msgDepth, 0, "msgDepth")
self.assertEqual(queue.byteDepth, 0, "byteDepth")
tx.send("0123456789")
tx.send("01234567890123456789")
tx.send("012345678901234567890123456789")
tx.send("0123456789012345678901234567890123456789")
overhead = 38 #overhead added to message from headers
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues")
self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues")
self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues")
self.assertEqual(queue.msgDepth, 4, "msgDepth")
self.assertEqual(queue.byteDepth, 100+(4*overhead), "byteDepth")
now_broker = agent.getBroker()
self.failUnless((now_broker.msgTotalEnqueues - start_broker.msgTotalEnqueues) >= 4, "broker msgTotalEnqueues")
self.failUnless((now_broker.byteTotalEnqueues - start_broker.byteTotalEnqueues) >= 100, "broker byteTotalEnqueues")
m = rx.fetch()
m = rx.fetch()
sess.acknowledge()
queue.update()
self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues")
self.assertEqual(queue.msgTotalDequeues, 2, "msgTotalDequeues")
self.assertEqual(queue.byteTotalDequeues, 30+(2*overhead), "byteTotalDequeues")
self.assertEqual(queue.msgDepth, 2, "msgDepth")
self.assertEqual(queue.byteDepth, 70+(2*overhead), "byteDepth")
now_broker = agent.getBroker()
self.failUnless((now_broker.msgTotalDequeues - start_broker.msgTotalDequeues) >= 2, "broker msgTotalDequeues")
self.failUnless((now_broker.byteTotalDequeues - start_broker.byteTotalDequeues) >= 30, "broker byteTotalDequeues")
sess.close()
now_broker = agent.getBroker()
self.assertEqual(now_broker.abandoned - start_broker.abandoned, 2, "expect 2 abandoned messages")
self.assertEqual(now_broker.msgDepth, start_broker.msgDepth, "expect broker message depth to be unchanged")
self.assertEqual(now_broker.byteDepth, start_broker.byteDepth, "expect broker byte depth to be unchanged")