in qpid_tests/broker_0_10/tx.py [0:0]
def perform_txn_work(self, session, name_a, name_b, name_c):
"""
Utility method that does some setup and some work under a transaction. Used for testing both
commit and rollback
"""
#setup:
self.declare_queues([name_a, name_b, name_c])
key = "my_key_" + name_b
topic = "my_topic_" + name_c
session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key)
session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic)
dp = session.delivery_properties(routing_key=name_a)
for i in range(1, 5):
mp = session.message_properties(message_id="msg%d" % i)
session.message_transfer(message=Message(dp, mp, "Message %d" % i))
dp = session.delivery_properties(routing_key=key)
mp = session.message_properties(message_id="msg6")
session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6"))
dp = session.delivery_properties(routing_key=topic)
mp = session.message_properties(message_id="msg7")
session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7"))
session.tx_select()
#consume and ack messages
acked = RangedSet()
self.subscribe(session, queue=name_a, destination="sub_a")
queue_a = session.incoming("sub_a")
for i in range(1, 5):
msg = queue_a.get(timeout=1)
acked.add(msg.id)
self.assertEqual("Message %d" % i, msg.body)
self.subscribe(session, queue=name_b, destination="sub_b")
queue_b = session.incoming("sub_b")
msg = queue_b.get(timeout=1)
self.assertEqual("Message 6", msg.body)
acked.add(msg.id)
sub_c = self.subscribe(session, queue=name_c, destination="sub_c")
queue_c = session.incoming("sub_c")
msg = queue_c.get(timeout=1)
self.assertEqual("Message 7", msg.body)
acked.add(msg.id)
session.message_accept(acked)
dp = session.delivery_properties(routing_key=topic)
#publish messages
for i in range(1, 5):
mp = session.message_properties(message_id="tx-msg%d" % i)
session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i))
dp = session.delivery_properties(routing_key=key)
mp = session.message_properties(message_id="tx-msg6")
session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6"))
dp = session.delivery_properties(routing_key=name_a)
mp = session.message_properties(message_id="tx-msg7")
session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
return queue_a, queue_b, queue_c, acked