def perform_txn_work()

in qpid_tests/broker_1_0/tx.py [0:0]


    def perform_txn_work(self, session, name_a, name_b, name_c):
        """
        Utility method that does some setup and some work under a transaction. Used for testing both
        commit and rollback
        """
        #setup:
        self.declare_queues([name_a, name_b, name_c])

        key = "my_key_" + name_b
        topic = "my_topic_" + name_c

        session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key)
        session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic)

        dp = session.delivery_properties(routing_key=name_a)
        for i in range(1, 5):
            mp = session.message_properties(message_id="msg%d" % i)
            session.message_transfer(message=Message(dp, mp, "Message %d" % i))

        dp = session.delivery_properties(routing_key=key)
        mp = session.message_properties(message_id="msg6")
        session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6"))

        dp = session.delivery_properties(routing_key=topic)
        mp = session.message_properties(message_id="msg7")
        session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7"))

        session.tx_select()

        #consume and ack messages
        acked = RangedSet()
        self.subscribe(session, queue=name_a, destination="sub_a")
        queue_a = session.incoming("sub_a")
        for i in range(1, 5):
            msg = queue_a.get(timeout=1)
            acked.add(msg.id)
            self.assertEqual("Message %d" % i, msg.body)

        self.subscribe(session, queue=name_b, destination="sub_b")
        queue_b = session.incoming("sub_b")
        msg = queue_b.get(timeout=1)
        self.assertEqual("Message 6", msg.body)
        acked.add(msg.id)

        sub_c = self.subscribe(session, queue=name_c, destination="sub_c")
        queue_c = session.incoming("sub_c")
        msg = queue_c.get(timeout=1)
        self.assertEqual("Message 7", msg.body)
        acked.add(msg.id)

        session.message_accept(acked)

        dp = session.delivery_properties(routing_key=topic)
        #publish messages
        for i in range(1, 5):
            mp = session.message_properties(message_id="tx-msg%d" % i)
            session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i))

        dp = session.delivery_properties(routing_key=key)
        mp = session.message_properties(message_id="tx-msg6")
        session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6"))

        dp = session.delivery_properties(routing_key=name_a)
        mp = session.message_properties(message_id="tx-msg7")
        session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
        return queue_a, queue_b, queue_c, acked