def test_qos_prefetch_size()

in qpid_tests/broker_0_8/basic.py [0:0]


    def test_qos_prefetch_size(self):
        """
        Test that the prefetch size specified is honoured
        """
        #setup: declare queue and subscribe
        channel = self.channel
        channel.queue_declare(queue="test-prefetch-size", exclusive=True)
        subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
        queue = self.client.queue(subscription.consumer_tag)

        #set prefetch to 50 bytes (each message is 9 or 10 bytes):
        channel.basic_qos(prefetch_size=50)

        #publish 10 messages:
        for i in range(1, 11):
            channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))

        #only 5 messages should have been delivered (i.e. 45 bytes worth):
        for i in range(1, 6):
            msg = queue.get(timeout=self.recv_timeout())
            self.assertEqual("Message %d" % i, msg.content.body)

        try:
            extra = queue.get(timeout=self.recv_timeout())
            self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
        except Empty: None

        #ack messages and check that the next set arrive ok:
        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)

        for i in range(6, 11):
            msg = queue.get(timeout=self.recv_timeout())
            self.assertEqual("Message %d" % i, msg.content.body)

        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)

        try:
            extra = queue.get(timeout=self.recv_timeout())
            self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
        except Empty: None

        #make sure that a single oversized message still gets delivered
        large = "abcdefghijklmnopqrstuvwxyz"
        large = large + "-" + large
        channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
        msg = queue.get(timeout=self.recv_timeout())
        self.assertEqual(large, msg.content.body)