in qpid_tests/broker_0_10/message.py [0:0]
def test_ack(self):
"""
Test basic ack/recover behaviour using a combination of implicit and
explicit accept subscriptions.
"""
self.startQmf()
session1 = self.conn.session("alternate-session", timeout=10)
session1.queue_declare(queue="test-ack-queue", auto_delete=True)
delivery_properties = session1.delivery_properties(routing_key="test-ack-queue")
for i in ["One", "Two", "Three", "Four", "Five"]:
session1.message_transfer(message=Message(delivery_properties, i))
# verify enqueued message count, use both QMF and session query to verify consistency
self.assertEqual(5, session1.queue_query(queue="test-ack-queue").message_count)
queueObj = self.qmf.getObjects(_class="queue", name="test-ack-queue")[0]
self.assertEquals(queueObj.msgDepth, 5)
self.assertEquals(queueObj.msgTotalEnqueues, 5)
self.assertEquals(queueObj.msgTotalDequeues, 0)
# subscribe with implied acquire, explicit accept:
session1.message_subscribe(queue = "test-ack-queue", destination = "consumer")
session1.message_flow(destination="consumer", unit=session1.credit_unit.message, value=0xFFFFFFFF)
session1.message_flow(destination="consumer", unit=session1.credit_unit.byte, value=0xFFFFFFFF)
queue = session1.incoming("consumer")
msg1 = queue.get(timeout=1)
msg2 = queue.get(timeout=1)
msg3 = queue.get(timeout=1)
msg4 = queue.get(timeout=1)
msg5 = queue.get(timeout=1)
self.assertEqual("One", msg1.body)
self.assertEqual("Two", msg2.body)
self.assertEqual("Three", msg3.body)
self.assertEqual("Four", msg4.body)
self.assertEqual("Five", msg5.body)
# messages should not be on the queue:
self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
# QMF shows the dequeues as not having happened yet, since they are have
# not been accepted
queueObj.update()
self.assertEquals(queueObj.msgDepth, 5)
self.assertEquals(queueObj.msgTotalEnqueues, 5)
self.assertEquals(queueObj.msgTotalDequeues, 0)
session1.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four
# QMF should now reflect the accepted messages as being dequeued
self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
queueObj.update()
self.assertEquals(queueObj.msgDepth, 2)
self.assertEquals(queueObj.msgTotalEnqueues, 5)
self.assertEquals(queueObj.msgTotalDequeues, 3)
#subscribe from second session here to ensure queue is not auto-deleted
#when alternate session closes. Use implicit accept mode to test that
#we don't need to explicitly accept
session2 = self.conn.session("alternate-session-2", timeout=10)
session2.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)
#now close the first session, and see that the unaccepted messages are
#then redelivered to another subscriber:
session1.close(timeout=10)
# check the statistics - the queue_query will show the non-accepted
# messages have been released. QMF never considered them dequeued, so
# those counts won't change
self.assertEqual(2, session2.queue_query(queue="test-ack-queue").message_count)
queueObj.update()
self.assertEquals(queueObj.msgDepth, 2)
self.assertEquals(queueObj.msgTotalEnqueues, 5)
self.assertEquals(queueObj.msgTotalDequeues, 3)
session2.message_flow(destination="checker", unit=session2.credit_unit.message, value=0xFFFFFFFF)
session2.message_flow(destination="checker", unit=session2.credit_unit.byte, value=0xFFFFFFFF)
queue = session2.incoming("checker")
msg3b = queue.get(timeout=1)
msg5b = queue.get(timeout=1)
self.assertEqual("Three", msg3b.body)
self.assertEqual("Five", msg5b.body)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected message: " + extra.body)
except Empty: None
self.assertEqual(0, session2.queue_query(queue="test-ack-queue").message_count)
queueObj.update()
self.assertEquals(queueObj.msgDepth, 0)
self.assertEquals(queueObj.msgTotalEnqueues, 5)
self.assertEquals(queueObj.msgTotalDequeues, 5)
# Subscribe one last time to keep the queue available, and to verify
# that the implied accept worked by verifying no messages have been
# returned when session2 is closed.
self.session.message_subscribe(queue = "test-ack-queue", destination = "final-checker")
session2.close(timeout=10)
# check the statistics - they should not have changed
self.assertEqual(0, self.session.queue_query(queue="test-ack-queue").message_count)
queueObj.update()
self.assertEquals(queueObj.msgDepth, 0)
self.assertEquals(queueObj.msgTotalEnqueues, 5)
self.assertEquals(queueObj.msgTotalDequeues, 5)
self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.message, value=0xFFFFFFFF)
self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
try:
extra = self.session.incoming("final-checker").get(timeout=1)
self.fail("Got unexpected message: " + extra.body)
except Empty: None