in qpid_tests/broker_0_8/basic.py [0:0]
def test_qos_prefetch_size(self):
"""
Test that the prefetch size specified is honoured
"""
#setup: declare queue and subscribe
channel = self.channel
channel.queue_declare(queue="test-prefetch-size", exclusive=True)
subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
queue = self.client.queue(subscription.consumer_tag)
#set prefetch to 50 bytes (each message is 9 or 10 bytes):
channel.basic_qos(prefetch_size=50)
#publish 10 messages:
for i in range(1, 11):
channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))
#only 5 messages should have been delivered (i.e. 45 bytes worth):
for i in range(1, 6):
msg = queue.get(timeout=self.recv_timeout())
self.assertEqual("Message %d" % i, msg.content.body)
try:
extra = queue.get(timeout=self.recv_timeout())
self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
except Empty: None
#ack messages and check that the next set arrive ok:
channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
for i in range(6, 11):
msg = queue.get(timeout=self.recv_timeout())
self.assertEqual("Message %d" % i, msg.content.body)
channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
try:
extra = queue.get(timeout=self.recv_timeout())
self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
except Empty: None
#make sure that a single oversized message still gets delivered
large = "abcdefghijklmnopqrstuvwxyz"
large = large + "-" + large
channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
msg = queue.get(timeout=self.recv_timeout())
self.assertEqual(large, msg.content.body)