#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
from qpid.client import Client, Closed
from qpid.queue import Empty
from qpid.datatypes import Message, RangedSet
from qpid.session import SessionException
from qpid.testlib import TestBase010
from qpid.compat import set
from struct import pack, unpack
from time import sleep

class DtxTests(TestBase010):
    """
    Tests for the amqp dtx related classes.

    Tests of the form test_simple_xxx test the basic transactional
    behaviour. The approach here is to 'swap' a message from one queue
    to another by consuming and re-publishing in the same
    transaction. That transaction is then completed in different ways
    and the appropriate result verified.

    The other tests enforce more specific rules and behaviour on a
    per-method or per-field basis.
    """

    XA_RBROLLBACK = 1
    XA_RBTIMEOUT = 2
    XA_OK = 0
    tx_counter = 0

    def reset_channel(self):
        self.session.close()
        self.session = self.conn.session("dtx-session", 1)

    def test_simple_commit(self):
        """
        Test basic one-phase commit behaviour.
        """
        guard = self.keepQueuesAlive(["queue-a", "queue-b"])
        session = self.session
        tx = self.xid("my-xid")
        self.txswap(tx, "commit")

        #neither queue should have any messages accessible
        self.assertMessageCount(0, "queue-a")
        self.assertMessageCount(0, "queue-b")

        #commit
        self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=True).status)

        #should close and reopen session to ensure no unacked messages are held
        self.reset_channel()

        #check result
        self.assertMessageCount(0, "queue-a")
        self.assertMessageCount(1, "queue-b")
        self.assertMessageId("commit", "queue-b")

    def test_simple_prepare_commit(self):
        """
        Test basic two-phase commit behaviour.
        """
        guard = self.keepQueuesAlive(["queue-a", "queue-b"])
        session = self.session
        tx = self.xid("my-xid")
        self.txswap(tx, "prepare-commit")

        #prepare
        self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)

        #neither queue should have any messages accessible
        self.assertMessageCount(0, "queue-a")
        self.assertMessageCount(0, "queue-b")

        #commit
        self.assertEqual(self.XA_OK, session.dtx_commit(xid=tx, one_phase=False).status)

        self.reset_channel()

        #check result
        self.assertMessageCount(0, "queue-a")
        self.assertMessageCount(1, "queue-b")
        self.assertMessageId("prepare-commit", "queue-b")


    def test_simple_rollback(self):
        """
        Test basic rollback behaviour.
        """
        guard = self.keepQueuesAlive(["queue-a", "queue-b"])
        session = self.session
        tx = self.xid("my-xid")
        self.txswap(tx, "rollback")

        #neither queue should have any messages accessible
        self.assertMessageCount(0, "queue-a")
        self.assertMessageCount(0, "queue-b")

        #rollback
        self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)

        self.reset_channel()

        #check result
        self.assertMessageCount(1, "queue-a")
        self.assertMessageCount(0, "queue-b")
        self.assertMessageId("rollback", "queue-a")

    def test_simple_prepare_rollback(self):
        """
        Test basic rollback behaviour after the transaction has been prepared.
        """
        guard = self.keepQueuesAlive(["queue-a", "queue-b"])
        session = self.session
        tx = self.xid("my-xid")
        self.txswap(tx, "prepare-rollback")

        #prepare
        self.assertEqual(self.XA_OK, session.dtx_prepare(xid=tx).status)

        #neither queue should have any messages accessible
        self.assertMessageCount(0, "queue-a")
        self.assertMessageCount(0, "queue-b")

        #rollback
        self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)

        self.reset_channel()

        #check result
        self.assertMessageCount(1, "queue-a")
        self.assertMessageCount(0, "queue-b")
        self.assertMessageId("prepare-rollback", "queue-a")

    def test_select_required(self):
        """
        check that an error is flagged if select is not issued before
        start or end
        """
        session = self.session
        tx = self.xid("dummy")
        try:
            session.dtx_start(xid=tx)

            #if we get here we have failed, but need to do some cleanup:
            session.dtx_end(xid=tx)
            session.dtx_rollback(xid=tx)
            self.fail("Session not selected for use with dtx, expected exception!")
        except SessionException as e:
            self.assertEquals(503, e.args[0].error_code)

    def test_start_already_known(self):
        """
        Verify that an attempt to start an association with a
        transaction that is already known is not allowed (unless the
        join flag is set).
        """
        #create two sessions on different connection & select them for use with dtx:
        session1 = self.session
        session1.dtx_select()

        other = self.connect()
        session2 = other.session("other", 0)
        session2.dtx_select()

        #create a xid
        tx = self.xid("dummy")
        #start work on one session under that xid:
        session1.dtx_start(xid=tx)
        #then start on the other without the join set
        failed = False
        try:
            session2.dtx_start(xid=tx)
        except SessionException as e:
            failed = True
            error = e

        #cleanup:
        if not failed:
            session2.dtx_end(xid=tx)
            other.close()
        session1.dtx_end(xid=tx)
        session1.dtx_rollback(xid=tx)

        #verification:
        if failed: self.assertEquals(530, error.args[0].error_code)
        else: self.fail("Xid already known, expected exception!")

    def test_forget_xid_on_completion(self):
        """
        Verify that a xid is 'forgotten' - and can therefore be used
        again - once it is completed.
        """
        #do some transactional work & complete the transaction
        self.test_simple_commit()
        # session has been reset, so reselect for use with dtx
        self.session.dtx_select()

        #start association for the same xid as the previously completed txn
        tx = self.xid("my-xid")
        self.session.dtx_start(xid=tx)
        self.session.dtx_end(xid=tx)
        self.session.dtx_rollback(xid=tx)

    def test_start_join_and_resume(self):
        """
        Ensure the correct error is signalled when both the join and
        resume flags are set on starting an association between a
        session and a transcation.
        """
        session = self.session
        session.dtx_select()
        tx = self.xid("dummy")
        try:
            session.dtx_start(xid=tx, join=True, resume=True)
            #failed, but need some cleanup:
            session.dtx_end(xid=tx)
            session.dtx_rollback(xid=tx)
            self.fail("Join and resume both set, expected exception!")
        except SessionException as e:
            self.assertEquals(503, e.args[0].error_code)

    def test_start_join(self):
        """
        Verify 'join' behaviour, where a session is associated with a
        transaction that is already associated with another session.
        """
        guard = self.keepQueuesAlive(["one", "two"])
        #create two sessions & select them for use with dtx:
        session1 = self.session
        session1.dtx_select()

        session2 = self.conn.session("second", 2)
        session2.dtx_select()

        #setup
        session1.queue_declare(queue="one", auto_delete=True)
        session1.queue_declare(queue="two", auto_delete=True)
        session1.message_transfer(self.createMessage(session1, "one", "a", "DtxMessage"))
        session1.message_transfer(self.createMessage(session1, "two", "b", "DtxMessage"))

        #create a xid
        tx = self.xid("dummy")
        #start work on one session under that xid:
        session1.dtx_start(xid=tx)
        #then start on the other with the join flag set
        session2.dtx_start(xid=tx, join=True)

        #do work through each session
        self.swap(session1, "one", "two")#swap 'a' from 'one' to 'two'
        self.swap(session2, "two", "one")#swap 'b' from 'two' to 'one'

        #mark end on both sessions
        session1.dtx_end(xid=tx)
        session2.dtx_end(xid=tx)

        #commit and check
        session1.dtx_commit(xid=tx, one_phase=True)
        self.assertMessageCount(1, "one")
        self.assertMessageCount(1, "two")
        self.assertMessageId("a", "two")
        self.assertMessageId("b", "one")


    def test_suspend_resume(self):
        """
        Test suspension and resumption of an association
        """
        session = self.session
        session.dtx_select()

        #setup
        session.queue_declare(queue="one", exclusive=True, auto_delete=True)
        session.queue_declare(queue="two", exclusive=True, auto_delete=True)
        session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
        session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))

        tx = self.xid("dummy")

        session.dtx_start(xid=tx)
        self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
        session.dtx_end(xid=tx, suspend=True)

        session.dtx_start(xid=tx, resume=True)
        self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
        session.dtx_end(xid=tx)

        #commit and check
        session.dtx_commit(xid=tx, one_phase=True)
        self.assertMessageCount(1, "one")
        self.assertMessageCount(1, "two")
        self.assertMessageId("a", "two")
        self.assertMessageId("b", "one")

    def test_suspend_start_end_resume(self):
        """
        Test suspension and resumption of an association with work
        done on another transaction when the first transaction is
        suspended
        """
        session = self.session
        session.dtx_select()

        #setup
        session.queue_declare(queue="one", exclusive=True, auto_delete=True)
        session.queue_declare(queue="two", exclusive=True, auto_delete=True)
        session.message_transfer(self.createMessage(session, "one", "a", "DtxMessage"))
        session.message_transfer(self.createMessage(session, "two", "b", "DtxMessage"))

        tx = self.xid("dummy")

        session.dtx_start(xid=tx)
        self.swap(session, "one", "two")#swap 'a' from 'one' to 'two'
        session.dtx_end(xid=tx, suspend=True)

        session.dtx_start(xid=tx, resume=True)
        self.swap(session, "two", "one")#swap 'b' from 'two' to 'one'
        session.dtx_end(xid=tx)

        #commit and check
        session.dtx_commit(xid=tx, one_phase=True)
        self.assertMessageCount(1, "one")
        self.assertMessageCount(1, "two")
        self.assertMessageId("a", "two")
        self.assertMessageId("b", "one")

    def test_end_suspend_and_fail(self):
        """
        Verify that the correct error is signalled if the suspend and
        fail flag are both set when disassociating a transaction from
        the session
        """
        session = self.session
        session.dtx_select()
        tx = self.xid("suspend_and_fail")
        session.dtx_start(xid=tx)
        try:
            session.dtx_end(xid=tx, suspend=True, fail=True)
            self.fail("Suspend and fail both set, expected exception!")
        except SessionException as e:
            self.assertEquals(503, e.args[0].error_code)

        #cleanup
        other = self.connect()
        session = other.session("cleanup", 1)
        session.dtx_rollback(xid=tx)
        session.close()
        other.close()


    def test_end_unknown_xid(self):
        """
        Verifies that the correct exception is thrown when an attempt
        is made to end the association for a xid not previously
        associated with the session
        """
        session = self.session
        session.dtx_select()
        tx = self.xid("unknown-xid")
        try:
            session.dtx_end(xid=tx)
            self.fail("Attempted to end association with unknown xid, expected exception!")
        except SessionException as e:
            self.assertEquals(409, e.args[0].error_code)

    def test_end(self):
        """
        Verify that the association is terminated by end and subsequent
        operations are non-transactional
        """
        guard = self.keepQueuesAlive(["tx-queue"])
        session = self.conn.session("alternate", 1)

        #publish a message under a transaction
        session.dtx_select()
        tx = self.xid("dummy")
        session.dtx_start(xid=tx)
        session.message_transfer(self.createMessage(session, "tx-queue", "one", "DtxMessage"))
        session.dtx_end(xid=tx)

        #now that association with txn is ended, publish another message
        session.message_transfer(self.createMessage(session, "tx-queue", "two", "DtxMessage"))

        #check the second message is available, but not the first
        self.assertMessageCount(1, "tx-queue")
        self.subscribe(session, queue="tx-queue", destination="results")
        msg = session.incoming("results").get(timeout=1)
        self.assertEqual("two", self.getMessageProperty(msg, 'correlation_id'))
        session.message_cancel(destination="results")
        #ack the message then close the session
        session.message_accept(RangedSet(msg.id))
        session.close()

        session = self.session
        #commit the transaction and check that the first message (and
        #only the first message) is then delivered
        session.dtx_commit(xid=tx, one_phase=True)
        self.assertMessageCount(1, "tx-queue")
        self.assertMessageId("one", "tx-queue")

    def test_invalid_commit_one_phase_true(self):
        """
        Test that a commit with one_phase = True is rejected if the
        transaction in question has already been prepared.
        """
        other = self.connect()
        tester = other.session("tester", 1)
        tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
        tester.dtx_select()
        tx = self.xid("dummy")
        tester.dtx_start(xid=tx)
        tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
        tester.dtx_end(xid=tx)
        tester.dtx_prepare(xid=tx)
        failed = False
        try:
            tester.dtx_commit(xid=tx, one_phase=True)
        except SessionException as e:
            failed = True
            error = e

        if failed:
            self.session.dtx_rollback(xid=tx)
            self.assertEquals(409, error.args[0].error_code)
        else:
            tester.close()
            other.close()
            self.fail("Invalid use of one_phase=True, expected exception!")

    def test_invalid_commit_one_phase_false(self):
        """
        Test that a commit with one_phase = False is rejected if the
        transaction in question has not yet been prepared.
        """
        other = self.connect()
        tester = other.session("tester", 1)
        tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
        tester.dtx_select()
        tx = self.xid("dummy")
        tester.dtx_start(xid=tx)
        tester.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))
        tester.dtx_end(xid=tx)
        failed = False
        try:
            tester.dtx_commit(xid=tx, one_phase=False)
        except SessionException as e:
            failed = True
            error = e

        if failed:
            self.session.dtx_rollback(xid=tx)
            self.assertEquals(409, error.args[0].error_code)
        else:
            tester.close()
            other.close()
            self.fail("Invalid use of one_phase=False, expected exception!")

    def test_invalid_commit_not_ended(self):
        """
        Test that a commit fails if the xid is still associated with a session.
        """
        other = self.connect()
        tester = other.session("tester", 1)
        self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
        self.session.dtx_select()
        tx = self.xid("dummy")
        self.session.dtx_start(xid=tx)
        self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))

        failed = False
        try:
            tester.dtx_commit(xid=tx, one_phase=False)
        except SessionException as e:
            failed = True
            error = e

        if failed:
            self.session.dtx_end(xid=tx)
            self.session.dtx_rollback(xid=tx)
            self.assertEquals(409, error.args[0].error_code)
        else:
            tester.close()
            other.close()
            self.fail("Commit should fail as xid is still associated!")

    def test_invalid_rollback_not_ended(self):
        """
        Test that a rollback fails if the xid is still associated with a session.
        """
        other = self.connect()
        tester = other.session("tester", 1)
        self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
        self.session.dtx_select()
        tx = self.xid("dummy")
        self.session.dtx_start(xid=tx)
        self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))

        failed = False
        try:
            tester.dtx_rollback(xid=tx)
        except SessionException as e:
            failed = True
            error = e

        if failed:
            self.session.dtx_end(xid=tx)
            self.session.dtx_rollback(xid=tx)
            self.assertEquals(409, error.args[0].error_code)
        else:
            tester.close()
            other.close()
            self.fail("Rollback should fail as xid is still associated!")


    def test_invalid_prepare_not_ended(self):
        """
        Test that a prepare fails if the xid is still associated with a session.
        """
        other = self.connect()
        tester = other.session("tester", 1)
        self.session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
        self.session.dtx_select()
        tx = self.xid("dummy")
        self.session.dtx_start(xid=tx)
        self.session.message_transfer(self.createMessage(tester, "dummy", "dummy", "whatever"))

        failed = False
        try:
            tester.dtx_prepare(xid=tx)
        except SessionException as e:
            failed = True
            error = e

        if failed:
            self.session.dtx_end(xid=tx)
            self.session.dtx_rollback(xid=tx)
            self.assertEquals(409, error.args[0].error_code)
        else:
            tester.close()
            other.close()
            self.fail("Rollback should fail as xid is still associated!")

    def test_implicit_end(self):
        """
        Test that an association is implicitly ended when the session
        is closed (whether by exception or explicit client request)
        and the transaction in question is marked as rollback only.
        """
        session1 = self.session
        session2 = self.conn.session("other", 2)

        #setup:
        session2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
        session2.message_transfer(self.createMessage(session2, "dummy", "a", "whatever"))
        tx = self.xid("dummy")

        session2.dtx_select()
        session2.dtx_start(xid=tx)
        session2.message_subscribe(queue="dummy", destination="dummy")
        session2.message_flow(destination="dummy", unit=session2.credit_unit.message, value=1)
        session2.message_flow(destination="dummy", unit=session2.credit_unit.byte, value=0xFFFFFFFF)
        msg = session2.incoming("dummy").get(timeout=1)
        session2.message_accept(RangedSet(msg.id))
        session2.message_cancel(destination="dummy")
        session2.message_transfer(self.createMessage(session2, "dummy", "b", "whatever"))
        session2.close()

        self.assertEqual(self.XA_RBROLLBACK, session1.dtx_prepare(xid=tx).status)
        session1.dtx_rollback(xid=tx)

    def test_get_timeout(self):
        """
        Check that get-timeout returns the correct value, (and that a
        transaction with a timeout can complete normally)
        """
        session = self.session
        tx = self.xid("dummy")

        session.dtx_select()
        session.dtx_start(xid=tx)
        # below test checks for default value of dtx-default-timeout broker option
        self.assertEqual(60, session.dtx_get_timeout(xid=tx).timeout)
        session.dtx_set_timeout(xid=tx, timeout=200)
        self.assertEqual(200, session.dtx_get_timeout(xid=tx).timeout)
        self.assertEqual(self.XA_OK, session.dtx_end(xid=tx).status)
        self.assertEqual(self.XA_OK, session.dtx_rollback(xid=tx).status)

    def test_set_timeout(self):
        """
        Test the timeout of a transaction results in the expected
        behaviour
        """

        guard = self.keepQueuesAlive(["queue-a", "queue-b"])
        #open new session to allow self.session to be used in checking the queue
        session = self.conn.session("worker", 1)
        #setup:
        tx = self.xid("dummy")
        session.queue_declare(queue="queue-a", auto_delete=True)
        session.queue_declare(queue="queue-b", auto_delete=True)
        session.message_transfer(self.createMessage(session, "queue-a", "timeout", "DtxMessage"))

        session.dtx_select()
        session.dtx_start(xid=tx)
        self.swap(session, "queue-a", "queue-b")
        session.dtx_set_timeout(xid=tx, timeout=2)
        sleep(3)
        #check that the work has been rolled back already
        self.assertMessageCount(1, "queue-a")
        self.assertMessageCount(0, "queue-b")
        self.assertMessageId("timeout", "queue-a")
        #check the correct codes are returned when we try to complete the txn
        self.assertEqual(self.XA_RBTIMEOUT, session.dtx_end(xid=tx).status)
        self.assertEqual(self.XA_RBTIMEOUT, session.dtx_rollback(xid=tx).status)

    def test_set_timeout_too_high(self):
        """
        Test the timeout can't be more than --dtx-max-timeout
        broker option
        """
        session = self.session
        tx = self.xid("dummy")

        session.dtx_select()
        session.dtx_start(xid=tx)
        try:
            session.dtx_set_timeout(xid=tx, timeout=3601)
        except SessionException as e:
            self.assertEquals(542, e.args[0].error_code)



    def test_recover(self):
        """
        Test basic recover behaviour
        """
        session = self.session

        session.dtx_select()
        session.queue_declare(queue="dummy", exclusive=True, auto_delete=True)

        prepared = []
        for i in range(1, 10):
            tx = self.xid("tx%s" % (i))
            session.dtx_start(xid=tx)
            session.message_transfer(self.createMessage(session, "dummy", "message%s" % (i), "message%s" % (i)))
            session.dtx_end(xid=tx)
            if i in [2, 5, 6, 8]:
                session.dtx_prepare(xid=tx)
                prepared.append(tx)
            else:
                session.dtx_rollback(xid=tx)

        xids = session.dtx_recover().in_doubt

        #rollback the prepared transactions returned by recover
        for x in xids:
            session.dtx_rollback(xid=x)

        #validate against the expected list of prepared transactions
        actual = set([x.global_id for x in xids]) #TODO: come up with nicer way to test these
        expected = set([x.global_id for x in prepared])
        intersection = actual.intersection(expected)

        if intersection != expected:
            missing = expected.difference(actual)
            extra = actual.difference(expected)
            self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))

    def test_bad_resume(self):
        """
        Test that a resume on a session not selected for use with dtx fails
        """
        session = self.session
        try:
            session.dtx_start(resume=True)
        except SessionException as e:
            self.assertEquals(503, e.args[0].error_code)

    def test_prepare_unknown(self):
        session = self.session
        try:
            session.dtx_prepare(xid=self.xid("unknown"))
        except SessionException as e:
            self.assertEquals(404, e.args[0].error_code)

    def test_commit_unknown(self):
        session = self.session
        try:
            session.dtx_commit(xid=self.xid("unknown"))
        except SessionException as e:
            self.assertEquals(404, e.args[0].error_code)

    def test_rollback_unknown(self):
        session = self.session
        try:
            session.dtx_rollback(xid=self.xid("unknown"))
        except SessionException as e:
            self.assertEquals(404, e.args[0].error_code)

    def test_get_timeout_unknown(self):
        session = self.session
        try:
            session.dtx_get_timeout(xid=self.xid("unknown"))
        except SessionException as e:
            self.assertEquals(404, e.args[0].error_code)

    def xid(self, txid):
        DtxTests.tx_counter += 1
        branchqual = "v%s" % DtxTests.tx_counter
        return self.session.xid(format=0, global_id=txid, branch_id=branchqual)

    def txswap(self, tx, id):
        session = self.session
        #declare two queues:
        session.queue_declare(queue="queue-a", auto_delete=True)
        session.queue_declare(queue="queue-b", auto_delete=True)

        #put message with specified id on one queue:
        dp=session.delivery_properties(routing_key="queue-a")
        mp=session.message_properties(correlation_id=id)
        session.message_transfer(message=Message(dp, mp, "DtxMessage"))

        #start the transaction:
        session.dtx_select()
        self.assertEqual(self.XA_OK, self.session.dtx_start(xid=tx).status)

        #'swap' the message from one queue to the other, under that transaction:
        self.swap(self.session, "queue-a", "queue-b")

        #mark the end of the transactional work:
        self.assertEqual(self.XA_OK, self.session.dtx_end(xid=tx).status)

    def swap(self, session, src, dest):
        #consume from src:
        session.message_subscribe(destination="temp-swap", queue=src)
        session.message_flow(destination="temp-swap", unit=session.credit_unit.message, value=1)
        session.message_flow(destination="temp-swap", unit=session.credit_unit.byte, value=0xFFFFFFFF)
        msg = session.incoming("temp-swap").get(timeout=1)
        session.message_cancel(destination="temp-swap")
        session.message_accept(RangedSet(msg.id))
        #todo: also complete at this point?

        #re-publish to dest:
        dp=session.delivery_properties(routing_key=dest)
        mp=session.message_properties(correlation_id=self.getMessageProperty(msg, 'correlation_id'))
        session.message_transfer(message=Message(dp, mp, msg.body))

    def assertMessageCount(self, expected, queue):
        self.assertEqual(expected, self.session.queue_query(queue=queue).message_count)

    def assertMessageId(self, expected, queue):
        self.session.message_subscribe(queue=queue, destination="results")
        self.session.message_flow(destination="results", unit=self.session.credit_unit.message, value=1)
        self.session.message_flow(destination="results", unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
        self.assertEqual(expected, self.getMessageProperty(self.session.incoming("results").get(timeout=1), 'correlation_id'))
        self.session.message_cancel(destination="results")

    def getMessageProperty(self, msg, prop):
        for h in msg.headers:
            if hasattr(h, prop): return getattr(h, prop)
        return None

    def keepQueuesAlive(self, names):
        session = self.conn.session("nasty", 99)
        for n in names:
            session.queue_declare(queue=n, auto_delete=True)
            session.message_subscribe(destination=n, queue=n)
        return session

    def createMessage(self, session, key, id, body):
        dp=session.delivery_properties(routing_key=key)
        mp=session.message_properties(correlation_id=id)
        session.message_transfer(message=Message(dp, mp, body))
