in qpid_tests/broker_0_8/basic.py [0:0]
def test_get(self):
"""
Test basic_get method
"""
channel = self.channel
channel.queue_declare(queue="test-get", exclusive=True)
#publish some messages (no_ack=True)
for i in range(1, 11):
channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
#use basic_get to read back the messages, and check that we get an empty at the end
for i in range(1, 11):
reply = channel.basic_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get_ok")
self.assertEqual("Message %d" % i, reply.content.body)
reply = channel.basic_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get_empty")
#repeat for no_ack=False
for i in range(11, 21):
channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
for i in range(11, 21):
reply = channel.basic_get(no_ack=False)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get_ok")
self.assertEqual("Message %d" % i, reply.content.body)
if(i == 13):
channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
if(i in [15, 17, 19]):
channel.basic_ack(delivery_tag=reply.delivery_tag)
reply = channel.basic_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get_empty")
#recover(requeue=True)
channel.basic_recover(requeue=True)
#get the unacked messages again (14, 16, 18, 20)
for i in [14, 16, 18, 20]:
reply = channel.basic_get(no_ack=False)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get_ok")
self.assertEqual("Message %d" % i, reply.content.body)
channel.basic_ack(delivery_tag=reply.delivery_tag)
reply = channel.basic_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get_empty")
channel.basic_recover(requeue=True)
reply = channel.basic_get(no_ack=True)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get_empty")