#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.testlib import TestBase010
from qpid.datatypes import Message, RangedSet
from qpid.session import SessionException

from qpid.content import Content
from time import sleep

class MessageTests(TestBase010):
    """Tests for 'methods' on the amqp message 'class'"""

    def test_no_local(self):
        """
        NOTE: this is a test of a QPID specific feature
        
        Test that the qpid specific no_local arg is honoured.
        """
        session = self.session
        #setup, declare two queues one of which excludes delivery of locally sent messages
        session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
        session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True, arguments={'no-local':'true'})
        #establish two consumers 
        self.subscribe(destination="local_included", queue="test-queue-1a")
        self.subscribe(destination="local_excluded", queue="test-queue-1b")

        #send a message
        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me"))
        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me"))

        #send a message from another session on the same connection to each queue
        session2 = self.conn.session("my-local-session")
        session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well"))
        session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either"))

        #send a message from a session on another connection to each queue
        for q in ["test-queue-1a", "test-queue-1b"]:
            session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key")
        other = self.connect()
        session3 = other.session("my-other-session")
        session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local"))
        other.close()

        #check the queues of the two consumers
        excluded = session.incoming("local_excluded")
        included = session.incoming("local_included")
        for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]:
            msg = included.get(timeout=1)
            self.assertEqual(b, msg.body)
        msg = excluded.get(timeout=1)
        self.assertEqual("i-am-not-local", msg.body)
        try:
            excluded.get(timeout=1)
            self.fail("Received locally published message though no_local=true")
        except Empty: None

    def test_no_local_awkward(self):

        """
        NOTE: this is a test of a QPID specific feature
        
        Check that messages which will be excluded through no-local
        processing will not block subsequent deliveries
        """

        session = self.session
        #setup:
        session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'no-local':'true'})
        #establish consumer which excludes delivery of locally sent messages
        self.subscribe(destination="local_excluded", queue="test-queue")

        #send a 'local' message
        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "local"))

        #send a non local message
        other = self.connect()
        session2 = other.session("my-session", 1)
        session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"), "foreign"))
        session2.close()
        other.close()

        #check that the second message only is delivered
        excluded = session.incoming("local_excluded")
        msg = excluded.get(timeout=1)
        self.assertEqual("foreign", msg.body)
        try:
            excluded.get(timeout=1)
            self.fail("Received extra message")
        except Empty: None
        #check queue is empty
        self.assertEqual(0, session.queue_query(queue="test-queue").message_count)

    def test_no_local_exclusive_subscribe(self):
        """
        NOTE: this is a test of a QPID specific feature

        Test that the no_local processing works on queues not declared
        as exclusive, but with an exclusive subscription
        """
        session = self.session

        #setup, declare two queues one of which excludes delivery of
        #locally sent messages but is not declared as exclusive
        session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
        session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'})
        #establish two consumers 
        self.subscribe(destination="local_included", queue="test-queue-1a")
        self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True)

        #send a message from the same session to each queue
        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me"))
        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me"))

        #send a message from another session on the same connection to each queue
        session2 = self.conn.session("my-session")
        session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well"))
        session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either"))

        #send a message from a session on another connection to each queue
        for q in ["test-queue-1a", "test-queue-1b"]:
            session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key")
        other = self.connect()
        session3 = other.session("my-other-session")
        session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local"))
        other.close()

        #check the queues of the two consumers
        excluded = session.incoming("local_excluded")
        included = session.incoming("local_included")
        for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]:
            msg = included.get(timeout=1)
            self.assertEqual(b, msg.body)
        msg = excluded.get(timeout=1)
        self.assertEqual("i-am-not-local", msg.body)
        try:
            excluded.get(timeout=1)
            self.fail("Received locally published message though no_local=true")
        except Empty: None


    def test_consume_exclusive(self):
        """
        Test an exclusive consumer prevents other consumer being created
        """
        session = self.session
        session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
        session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True)
        try:
            session.message_subscribe(destination="second", queue="test-queue-2")
            self.fail("Expected consume request to fail due to previous exclusive consumer")
        except SessionException as e:
            self.assertEquals(405, e.args[0].error_code)

    def test_consume_exclusive2(self):
        """
        Check that an exclusive consumer cannot be created if a consumer already exists:
        """
        session = self.session
        session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
        session.message_subscribe(destination="first", queue="test-queue-2")
        try:
            session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True)
            self.fail("Expected exclusive consume request to fail due to previous consumer")
        except SessionException as e:
            self.assertEquals(405, e.args[0].error_code)

    def test_consume_queue_not_found(self):
        """
        Test error conditions associated with the queue field of the consume method:
        """
        session = self.session
        try:
            #queue specified but doesn't exist:
            session.message_subscribe(queue="invalid-queue", destination="a")
            self.fail("Expected failure when consuming from non-existent queue")
        except SessionException as e:
            self.assertEquals(404, e.args[0].error_code)

    def test_consume_queue_not_specified(self):
        session = self.session
        try:
            #queue not specified and none previously declared for channel:
            session.message_subscribe(destination="a")
            self.fail("Expected failure when consuming from unspecified queue")
        except SessionException as e:
            self.assertEquals(531, e.args[0].error_code)

    def test_consume_unique_consumers(self):
        """
        Ensure unique consumer tags are enforced
        """
        session = self.session
        #setup, declare a queue:
        session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True)

        #check that attempts to use duplicate tags are detected and prevented:
        session.message_subscribe(destination="first", queue="test-queue-3")
        try:
            session.message_subscribe(destination="first", queue="test-queue-3")
            self.fail("Expected consume request to fail due to non-unique tag")
        except SessionException as e:
            self.assertEquals(530, e.args[0].error_code)

    def test_cancel(self):
        """
        Test compliance of the basic.cancel method
        """
        session = self.session
        #setup, declare a queue:
        session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True)
        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One"))

        session.message_subscribe(destination="my-consumer", queue="test-queue-4")
        myqueue = session.incoming("my-consumer")
        session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFF)
        session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF)

        #should flush here

        #cancel should stop messages being delivered
        session.message_cancel(destination="my-consumer")
        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two"))
        msg = myqueue.get(timeout=1)
        self.assertEqual("One", msg.body)
        try:
            msg = myqueue.get(timeout=1)
            self.fail("Got message after cancellation: " + msg)
        except Empty: None

        #cancellation of non-existant consumers should be result in 404s
        try:
            session.message_cancel(destination="my-consumer")
            self.fail("Expected 404 for recancellation of subscription.")
        except SessionException as e:
            self.assertEquals(404, e.args[0].error_code)

        session = self.conn.session("alternate-session", timeout=10)
        try:
            session.message_cancel(destination="this-never-existed")
            self.fail("Expected 404 for cancellation of unknown subscription.")
        except SessionException as e:
            self.assertEquals(404, e.args[0].error_code)


    def test_ack(self):
        """
        Test basic ack/recover behaviour using a combination of implicit and
        explicit accept subscriptions.
        """
        self.startQmf()
        session1 = self.conn.session("alternate-session", timeout=10)
        session1.queue_declare(queue="test-ack-queue", auto_delete=True)

        delivery_properties = session1.delivery_properties(routing_key="test-ack-queue")
        for i in ["One", "Two", "Three", "Four", "Five"]:
            session1.message_transfer(message=Message(delivery_properties, i))

        # verify enqueued message count, use both QMF and session query to verify consistency
        self.assertEqual(5, session1.queue_query(queue="test-ack-queue").message_count)
        queueObj = self.qmf.getObjects(_class="queue", name="test-ack-queue")[0]
        self.assertEquals(queueObj.msgDepth, 5)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 0)

        # subscribe with implied acquire, explicit accept:
        session1.message_subscribe(queue = "test-ack-queue", destination = "consumer")
        session1.message_flow(destination="consumer", unit=session1.credit_unit.message, value=0xFFFFFFFF)
        session1.message_flow(destination="consumer", unit=session1.credit_unit.byte, value=0xFFFFFFFF)
        queue = session1.incoming("consumer")

        msg1 = queue.get(timeout=1)
        msg2 = queue.get(timeout=1)
        msg3 = queue.get(timeout=1)
        msg4 = queue.get(timeout=1)
        msg5 = queue.get(timeout=1)

        self.assertEqual("One", msg1.body)
        self.assertEqual("Two", msg2.body)
        self.assertEqual("Three", msg3.body)
        self.assertEqual("Four", msg4.body)
        self.assertEqual("Five", msg5.body)

        # messages should not be on the queue:
        self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
        # QMF shows the dequeues as not having happened yet, since they are have
        # not been accepted
        queueObj.update()
        self.assertEquals(queueObj.msgDepth, 5)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 0)

        session1.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four

        # QMF should now reflect the accepted messages as being dequeued
        self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
        queueObj.update()
        self.assertEquals(queueObj.msgDepth, 2)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 3)

        #subscribe from second session here to ensure queue is not auto-deleted
        #when alternate session closes.  Use implicit accept mode to test that
        #we don't need to explicitly accept
        session2 = self.conn.session("alternate-session-2", timeout=10)
        session2.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)

        #now close the first session, and see that the unaccepted messages are
        #then redelivered to another subscriber:
        session1.close(timeout=10)

        # check the statistics - the queue_query will show the non-accepted
        # messages have been released. QMF never considered them dequeued, so
        # those counts won't change
        self.assertEqual(2, session2.queue_query(queue="test-ack-queue").message_count)
        queueObj.update()
        self.assertEquals(queueObj.msgDepth, 2)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 3)

        session2.message_flow(destination="checker", unit=session2.credit_unit.message, value=0xFFFFFFFF)
        session2.message_flow(destination="checker", unit=session2.credit_unit.byte, value=0xFFFFFFFF)
        queue = session2.incoming("checker")

        msg3b = queue.get(timeout=1)
        msg5b = queue.get(timeout=1)

        self.assertEqual("Three", msg3b.body)
        self.assertEqual("Five", msg5b.body)

        try:
            extra = queue.get(timeout=1)
            self.fail("Got unexpected message: " + extra.body)
        except Empty: None

        self.assertEqual(0, session2.queue_query(queue="test-ack-queue").message_count)
        queueObj.update()
        self.assertEquals(queueObj.msgDepth, 0)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 5)

        # Subscribe one last time to keep the queue available, and to verify
        # that the implied accept worked by verifying no messages have been
        # returned when session2 is closed.
        self.session.message_subscribe(queue = "test-ack-queue", destination = "final-checker")

        session2.close(timeout=10)

        # check the statistics - they should not have changed
        self.assertEqual(0, self.session.queue_query(queue="test-ack-queue").message_count)
        queueObj.update()
        self.assertEquals(queueObj.msgDepth, 0)
        self.assertEquals(queueObj.msgTotalEnqueues, 5)
        self.assertEquals(queueObj.msgTotalDequeues, 5)

        self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.message, value=0xFFFFFFFF)
        self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
        try:
            extra = self.session.incoming("final-checker").get(timeout=1)
            self.fail("Got unexpected message: " + extra.body)
        except Empty: None

    def test_reject(self):
        session = self.session
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
        session.queue_declare(queue = "r", exclusive=True, auto_delete=True)
        session.exchange_bind(queue = "r", exchange = "amq.fanout")

        session.message_subscribe(queue = "q", destination = "consumer")
        session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFF)
        session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFF)
        session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah"))
        msg = session.incoming("consumer").get(timeout = 1)
        self.assertEquals(msg.body, "blah, blah")
        session.message_reject(RangedSet(msg.id))

        session.message_subscribe(queue = "r", destination = "checker")
        session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFF)
        session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFF)
        msg = session.incoming("checker").get(timeout = 1)
        self.assertEquals(msg.body, "blah, blah")

    def test_credit_flow_messages(self):
        """
        Test basic credit based flow control with unit = message
        """
        #declare an exclusive queue
        session = self.session
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
        #create consumer (for now that defaults to infinite credit)
        session.message_subscribe(queue = "q", destination = "c")
        session.message_set_flow_mode(flow_mode = 0, destination = "c")
        #send batch of messages to queue
        for i in range(1, 11):
            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))

        #set message credit to finite amount (less than enough for all messages)
        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
        #set infinite byte credit
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="c")
        #check that expected number were received
        q = session.incoming("c")
        for i in range(1, 6):
            self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
        self.assertEmpty(q)

        #increase credit again and check more are received
        for i in range(6, 11):
            session.message_flow(unit = session.credit_unit.message, value = 1, destination = "c")
            self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
            self.assertEmpty(q)

    def test_credit_flow_bytes(self):
        """
        Test basic credit based flow control with unit = bytes
        """
        #declare an exclusive queue
        session = self.session
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
        #create consumer (for now that defaults to infinite credit)
        session.message_subscribe(queue = "q", destination = "c")
        session.message_set_flow_mode(flow_mode = 0, destination = "c")
        #send batch of messages to queue
        for i in range(10):
            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))

        #each message is currently interpreted as requiring msg_size bytes of credit
        msg_size = 19

        #set byte credit to finite amount (less than enough for all messages)
        session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c")
        #set infinite message credit
        session.message_flow(unit = session.credit_unit.message, value =0xFFFFFFFF, destination ="c")
        #check that expected number were received
        q = session.incoming("c")
        for i in range(5):
            self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
        self.assertEmpty(q)

        #increase credit again and check more are received
        for i in range(5):
            session.message_flow(unit = session.credit_unit.byte, value = msg_size, destination = "c")
            self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
            self.assertEmpty(q)


    def test_window_flow_messages(self):
        """
        Test basic window based flow control with unit = message
        """
        #declare an exclusive queue
        session = self.session
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
        #create consumer (for now that defaults to infinite credit)
        session.message_subscribe(queue = "q", destination = "c")
        session.message_set_flow_mode(flow_mode = 1, destination = "c")
        #send batch of messages to queue
        for i in range(1, 11):
            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))

        #set message credit to finite amount (less than enough for all messages)
        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
        #set infinite byte credit
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="c")
        #check that expected number were received
        q = session.incoming("c")
        ids = []
        for i in range(1, 6):            
            msg = q.get(timeout = 1)
            ids.append(msg.id)
            self.assertDataEquals(session, msg, "Message %d" % i)
        self.assertEmpty(q)

        #acknowledge messages and check more are received
        #TODO: there may be a nicer way of doing this
        for i in ids:
           session.receiver._completed.add(i)
        session.channel.session_completed(session.receiver._completed)

        for i in range(6, 11):
            self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
        self.assertEmpty(q)


    def test_window_flow_bytes(self):
        """
        Test basic window based flow control with unit = bytes
        """
        #declare an exclusive queue
        session = self.session
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
        #create consumer (for now that defaults to infinite credit)
        session.message_subscribe(queue = "q", destination = "c")
        session.message_set_flow_mode(flow_mode = 1, destination = "c")
        #send batch of messages to queue
        for i in range(10):
            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))

        #each message is currently interpreted as requiring msg_size bytes of credit
        msg_size = 19

        #set byte credit to finite amount (less than enough for all messages)
        session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c")
        #set infinite message credit
        session.message_flow(unit = session.credit_unit.message, value =0xFFFFFFFF, destination ="c")
        #check that expected number were received
        q = session.incoming("c")
        msgs = []
        for i in range(5):
            msg = q.get(timeout = 1)
            msgs.append(msg)
            self.assertDataEquals(session, msg, "abcdefgh")
        self.assertEmpty(q)

        #ack each message individually and check more are received
        for i in range(5):
            msg = msgs.pop()
            #TODO: there may be a nicer way of doing this
            session.receiver._completed.add(msg.id)
            session.channel.session_completed(session.receiver._completed)
            self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
            self.assertEmpty(q)

    def test_window_flush_ack_flow(self):
        """
        Test basic window based flow control with unit = bytes
        """
        #declare an exclusive queue
        ssn = self.session
        ssn.queue_declare(queue = "q", exclusive=True, auto_delete=True)
        #create consumer
        ssn.message_subscribe(queue = "q", destination = "c",
                              accept_mode=ssn.accept_mode.explicit)
        ssn.message_set_flow_mode(flow_mode = ssn.flow_mode.window, destination = "c")

        #send message A
        ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "A"))

        for unit in ssn.credit_unit.VALUES:
            ssn.message_flow("c", unit, 0xFFFFFFFF)

        q = ssn.incoming("c")
        msgA = q.get(timeout=10)

        ssn.message_flush(destination="c")

        # XXX
        ssn.receiver._completed.add(msgA.id)
        ssn.channel.session_completed(ssn.receiver._completed)
        ssn.message_accept(RangedSet(msgA.id))

        for unit in ssn.credit_unit.VALUES:
            ssn.message_flow("c", unit, 0xFFFFFFFF)

        #send message B
        ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "B"))

        msgB = q.get(timeout=10)

    def test_window_stop(self):
        """
        Ensure window based flow control reacts to stop correctly
        """
        session = self.session
        #setup subscriber on a test queue
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
        session.message_subscribe(queue = "q", destination = "c")
        session.message_set_flow_mode(flow_mode = 1, destination = "c")
        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="c")


        #send batch of messages to queue
        for i in range(0, 10):
            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % (i+1)))

        #retrieve all delivered messages
        q = session.incoming("c")
        for i in range(0, 5):
            msg = q.get(timeout = 1)
            session.receiver._completed.add(msg.id)#TODO: this may be done automatically
            self.assertDataEquals(session, msg, "Message %d" % (i+1))

        session.message_stop(destination = "c")

        #now send completions, normally used to move window forward,
        #but after a stop should not do so
        session.channel.session_completed(session.receiver._completed)

        #check no more messages are sent
        self.assertEmpty(q)

        #re-establish window and check remaining messages
        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="c")
        for i in range(0, 5):
            msg = q.get(timeout = 1)
            self.assertDataEquals(session, msg, "Message %d" % (i+6))

    def test_credit_window_after_messagestop(self):
        """
        Tests that the broker's credit window size doesnt exceed the requested value when completing
        previous messageTransfer commands after a message_stop and message_flow.
        """

        session = self.session

        #create queue
        session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True)

        #send 11 messages
        for i in range(1, 12):
            session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i)))


        #subscribe:
        session.message_subscribe(queue=self.test_queue_name, destination="a")
        a = session.incoming("a")
        session.message_set_flow_mode(flow_mode = 1, destination = "a")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="a")
        # issue 5 message credits
        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a")

        # get 5 messages
        ids = RangedSet()
        for i in range(1, 6):
            msg = a.get(timeout = 1)
            self.assertEquals("message-%d" % (i), msg.body)
            ids.add(msg.id)

        # now try and read a 6th message. we expect this to fail due to exhausted message credit.
        try:
            extra = a.get(timeout=1)
            self.fail("Got unexpected message: " + extra.body)
        except Empty: None

        session.message_stop(destination = "a")

        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="a")
        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a")

        # complete earlier messages after setting the window to 5 message credits
        session.channel.session_completed(ids)

        # Now continue to read the next 5 messages
        for i in range(6, 11):
            msg = a.get(timeout = 1)
            self.assertEquals("message-%d" % (i), msg.body)

        # now try and read the 11th message. we expect this to fail due to exhausted message credit.  If we receive an
        # 11th this indicates the broker is not respecting the client's requested window size.
        try:
            extra = a.get(timeout=1)
            self.fail("Got unexpected message: " + extra.body)
        except Empty: None

    def test_no_credit_wrap(self):
        """
        Ensure that adding credit does not result in wrapround, lowering the balance.
        """
        session = self.session

        session.queue_declare(queue = self.test_queue_name, exclusive=True, auto_delete=True)
        session.message_subscribe(queue=self.test_queue_name, destination="a")
        a = session.incoming("a")
        session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="a")
        session.message_flow(unit = session.credit_unit.message, value =0xFFFFFFFA, destination ="a")
        #test wraparound of credit balance does not occur
        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
        for i in range(1, 50):
            session.message_transfer(message=Message(session.delivery_properties(routing_key=self.test_queue_name), "message-%d" % (i)))
        session.message_flush(destination = "a")
        for i in range(1, 50):
            msg = a.get(timeout = 1)
            self.assertEquals("message-%d" % (i), msg.body)


    def test_subscribe_not_acquired(self):
        """
        Test the not-acquired modes works as expected for a simple case
        """
        session = self.session
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
        for i in range(1, 6):
            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))

        session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
        session.message_flow(unit = session.credit_unit.message, value =0xFFFFFFFF, destination ="a")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="a")
        session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
        session.message_flow(unit = session.credit_unit.message, value =0xFFFFFFFF, destination ="b")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="b")

        for i in range(6, 11):
            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))

        #both subscribers should see all messages
        qA = session.incoming("a")
        qB = session.incoming("b")
        for i in range(1, 11):
            for q in [qA, qB]:
                msg = q.get(timeout = 1)
                self.assertEquals("Message %s" % i, msg.body)
                #TODO: tidy up completion
                session.receiver._completed.add(msg.id)

        #TODO: tidy up completion
        session.channel.session_completed(session.receiver._completed)
        #messages should still be on the queue:
        self.assertEquals(10, session.queue_query(queue = "q").message_count)

    def test_acquire_with_no_accept_and_credit_flow(self):
        """
        Test that messages recieved unacquired, with accept not
        required in windowing mode can be acquired.
        """
        session = self.session
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)

        session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))

        session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1)
        session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a")
        session.message_flow(unit = session.credit_unit.message, value =0xFFFFFFFF, destination ="a")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="a")
        msg = session.incoming("a").get(timeout = 1)
        self.assertEquals("acquire me", msg.body)
        #message should still be on the queue:
        self.assertEquals(1, session.queue_query(queue = "q").message_count)

        transfers = RangedSet(msg.id)
        response = session.message_acquire(transfers)
        #check that we get notification (i.e. message_acquired)
        self.assert_(msg.id in response.transfers)
        #message should have been removed from the queue:
        self.assertEquals(0, session.queue_query(queue = "q").message_count)

    def test_acquire(self):
        """
        Test explicit acquire function
        """
        session = self.session
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)

        session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))

        session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
        session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFF)
        session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFF)
        msg = session.incoming("a").get(timeout = 1)
        self.assertEquals("acquire me", msg.body)
        #message should still be on the queue:
        self.assertEquals(1, session.queue_query(queue = "q").message_count)

        transfers = RangedSet(msg.id)
        response = session.message_acquire(transfers)
        #check that we get notification (i.e. message_acquired)
        self.assert_(msg.id in response.transfers)
        #message should have been removed from the queue:
        self.assertEquals(0, session.queue_query(queue = "q").message_count)
        session.message_accept(transfers)


    def test_release(self):
        """
        Test explicit release function
        """
        session = self.session
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)

        session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me"))

        session.message_subscribe(queue = "q", destination = "a")
        session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFF)
        session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFF)
        msg = session.incoming("a").get(timeout = 1)
        self.assertEquals("release me", msg.body)
        session.message_cancel(destination = "a")
        session.message_release(RangedSet(msg.id))

        #message should not have been removed from the queue:
        self.assertEquals(1, session.queue_query(queue = "q").message_count)

    def test_release_ordering(self):
        """
        Test order of released messages is as expected
        """
        session = self.session
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
        for i in range (1, 11):
            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i)))

        session.message_subscribe(queue = "q", destination = "a")
        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="a")
        queue = session.incoming("a")
        first = queue.get(timeout = 1)
        for i in range(2, 10):
            msg = queue.get(timeout = 1)
            self.assertEquals("released message %s" % (i), msg.body)
            
        last = queue.get(timeout = 1)
        self.assertEmpty(queue)
        released = RangedSet()
        released.add(first.id, last.id)
        session.message_release(released)

        #TODO: may want to clean this up...
        session.receiver._completed.add(first.id, last.id)
        session.channel.session_completed(session.receiver._completed)
        
        for i in range(1, 11):
            self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body)

    def test_ranged_ack(self):
        """
        Test acking of messages ranges
        """
        session = self.conn.session("alternate-session", timeout=10)

        session.queue_declare(queue = "q", auto_delete=True)
        delivery_properties = session.delivery_properties(routing_key="q")
        for i in range (1, 11):
            session.message_transfer(message=Message(delivery_properties, "message %s" % (i)))

        session.message_subscribe(queue = "q", destination = "a")
        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="a")
        queue = session.incoming("a")
        ids = []
        for i in range (1, 11):
            msg = queue.get(timeout = 1)
            self.assertEquals("message %s" % (i), msg.body)
            ids.append(msg.id)
            
        self.assertEmpty(queue)

        #ack all but the fourth message (command id 2)
        accepted = RangedSet()
        accepted.add(ids[0], ids[2])
        accepted.add(ids[4], ids[9])
        session.message_accept(accepted)

        #subscribe from second session here to ensure queue is not
        #auto-deleted when alternate session closes (no need to ack on these):
        self.session.message_subscribe(queue = "q", destination = "checker")

        #now close the session, and see that the unacked messages are
        #then redelivered to another subscriber:
        session.close(timeout=10)

        session = self.session
        session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFF)
        session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFF)
        queue = session.incoming("checker")

        self.assertEquals("message 4", queue.get(timeout = 1).body)
        self.assertEmpty(queue)

    def test_subscribe_not_acquired_2(self):
        session = self.session

        #publish some messages
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
        for i in range(1, 11):
            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))

        #consume some of them
        session.message_subscribe(queue = "q", destination = "a")
        session.message_set_flow_mode(flow_mode = 0, destination = "a")
        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="a")

        queue = session.incoming("a")
        for i in range(1, 6):
            msg = queue.get(timeout = 1)
            self.assertEquals("message-%d" % (i), msg.body)
            #complete and accept
            session.message_accept(RangedSet(msg.id))
            #TODO: tidy up completion
            session.receiver._completed.add(msg.id)
            session.channel.session_completed(session.receiver._completed)
        self.assertEmpty(queue)

        #now create a not-acquired subscriber
        session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="b")

        #check it gets those not consumed
        queue = session.incoming("b")
        session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
        for i in range(6, 11):
            msg = queue.get(timeout = 1)
            self.assertEquals("message-%d" % (i), msg.body)
            session.message_release(RangedSet(msg.id))
            #TODO: tidy up completion
            session.receiver._completed.add(msg.id)
            session.channel.session_completed(session.receiver._completed)
        session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
        self.assertEmpty(queue)

        #check all 'browsed' messages are still on the queue
        self.assertEqual(5, session.queue_query(queue="q").message_count)

    def test_subscribe_not_acquired_3(self):
        session = self.session

        #publish some messages
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
        for i in range(1, 11):
            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))

        #create a not-acquired subscriber
        session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="a")
        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")

        #browse through messages
        queue = session.incoming("a")
        for i in range(1, 11):
            msg = queue.get(timeout = 1)
            self.assertEquals("message-%d" % (i), msg.body)
            if (i % 2):
                #try to acquire every second message
                response = session.message_acquire(RangedSet(msg.id))
                #check that acquire succeeds
                self.assert_(msg.id in response.transfers)
                session.message_accept(RangedSet(msg.id))
            else:
                session.message_release(RangedSet(msg.id))
            session.receiver._completed.add(msg.id)
            session.channel.session_completed(session.receiver._completed)
        self.assertEmpty(queue)

        #create a second not-acquired subscriber
        session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="b")
        session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
        #check it gets those not consumed
        queue = session.incoming("b")
        for i in [2,4,6,8,10]:
            msg = queue.get(timeout = 1)
            self.assertEquals("message-%d" % (i), msg.body)
            session.message_release(RangedSet(msg.id))
            session.receiver._completed.add(msg.id)
            session.channel.session_completed(session.receiver._completed)
        session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
        self.assertEmpty(queue)

        #check all 'browsed' messages are still on the queue
        self.assertEqual(5, session.queue_query(queue="q").message_count)

    def test_release_unacquired(self):
        session = self.session

        #create queue
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)

        #send message
        session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message"))

        #create two 'browsers'
        session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="a")
        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
        queueA = session.incoming("a")

        session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="b")
        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "b")
        queueB = session.incoming("b")
        
        #have each browser release the message
        msgA = queueA.get(timeout = 1)
        session.message_release(RangedSet(msgA.id))

        msgB = queueB.get(timeout = 1)
        session.message_release(RangedSet(msgB.id))
        
        #cancel browsers
        session.message_cancel(destination = "a")
        session.message_cancel(destination = "b")
        
        #create consumer
        session.message_subscribe(queue = "q", destination = "c")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="c")
        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "c")
        queueC = session.incoming("c")
        #consume the message then ack it
        msgC = queueC.get(timeout = 1)
        session.message_accept(RangedSet(msgC.id))
        #ensure there are no other messages
        self.assertEmpty(queueC)

    def test_release_order(self):
        session = self.session

        #create queue
        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)

        #send messages
        for i in range(1, 11):
            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))

        #subscribe:
        session.message_subscribe(queue="q", destination="a")
        a = session.incoming("a")
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination ="a")
        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")

        # receive all messages into list
        messages = []
        for i in range(1, 11):
            msg = a.get(timeout = 1)
            self.assertEquals("message-%d" % (i), msg.body)
            messages.append(msg)

        # accept/release received messages
        for i, msg in enumerate(messages, start=1):
            if (i % 2):
                #accept all odd messages
                session.message_accept(RangedSet(msg.id))
            else:
                #release all even messages
                session.message_release(RangedSet(msg.id))

        session.message_subscribe(queue="q", destination="b", acquire_mode=0)
        b = session.incoming("b")
        b.start()
        for i in [2, 4, 6, 8, 10]:
            msg = b.get(timeout = 1)
            self.assertEquals("message-%d" % (i), msg.body)


    def test_empty_body(self):
        session = self.session
        session.queue_declare(queue="xyz", exclusive=True, auto_delete=True)
        props = session.delivery_properties(routing_key="xyz")
        session.message_transfer(message=Message(props, ""))

        consumer_tag = "tag1"
        session.message_subscribe(queue="xyz", destination=consumer_tag)
        session.message_flow(unit = session.credit_unit.message, value =0xFFFFFFFF, destination = consumer_tag)
        session.message_flow(unit = session.credit_unit.byte, value =0xFFFFFFFF, destination = consumer_tag)
        queue = session.incoming(consumer_tag)
        msg = queue.get(timeout=1)
        self.assertEquals("", msg.body)
        session.message_accept(RangedSet(msg.id))

    def test_incoming_start(self):
        q = "test_incoming_start"
        session = self.session

        session.queue_declare(queue=q, exclusive=True, auto_delete=True)
        session.message_subscribe(queue=q, destination="msgs")
        messages = session.incoming("msgs")
        assert messages.destination == "msgs"

        dp = session.delivery_properties(routing_key=q)
        session.message_transfer(message=Message(dp, "test"))

        messages.start()
        msg = messages.get()
        assert msg.body == "test"

    def test_ttl(self):
        q = "test_ttl"
        session = self.session

        session.queue_declare(queue=q, exclusive=True, auto_delete=True)

        dp = session.delivery_properties(routing_key=q, ttl=500)#expire in half a second
        session.message_transfer(message=Message(dp, "first"))

        dp = session.delivery_properties(routing_key=q, ttl=300000)#expire in fives minutes
        session.message_transfer(message=Message(dp, "second"))

        d = "msgs"
        session.message_subscribe(queue=q, destination=d)
        messages = session.incoming(d)
        sleep(1)
        session.message_flow(unit = session.credit_unit.message, value=2, destination=d)
        session.message_flow(unit = session.credit_unit.byte, value=0xFFFFFFFF, destination=d)
        assert messages.get(timeout=1).body == "second"
        self.assertEmpty(messages)

    def assertDataEquals(self, session, msg, expected):
        self.assertEquals(expected, msg.body)

    def assertEmpty(self, queue):
        try:
            extra = queue.get(timeout=1)
            self.fail("Queue not empty, contains: " + extra.body)
        except Empty: None

class SizelessContent(Content):

    def size(self):
        return None
