in qpid_tests/broker_0_10/msg_groups.py [0:0]
def test_send_transaction(self):
""" Verify behavior when sender is using transactions.
"""
ssn = self.conn.session(transactional=True)
snd = ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
" {'qpid.group_header_key':'THE-GROUP'," +
"'qpid.shared_msg_group':1}}}}")
msg = Message(content={'index':0}, properties={"THE-GROUP": "A"})
snd.send(msg)
msg = Message(content={'index':1}, properties={"THE-GROUP": "B"})
snd.send(msg)
snd.session.commit()
msg = Message(content={'index':2}, properties={"THE-GROUP": "A"})
snd.send(msg)
# Queue: [A0,B1, (uncommitted: A2) ]
s1 = self.conn.session(transactional=True)
c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.conn.session(transactional=True)
c2 = s2.receiver("msg-group-q", options={"capacity":0})
# C1 gets A0, group A
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 0
# C2 gets B2, group B
m2 = c2.fetch(0)
assert m2.properties['THE-GROUP'] == 'B'
assert m2.content['index'] == 1
# Since A2 uncommitted, there should be nothing left to fetch
try:
mX = c1.fetch(0)
assert False # should not get here
except Empty:
pass
try:
mX = c2.fetch(0)
assert False # should not get here
except Empty:
pass
snd.session.commit()
msg = Message(content={'index':3}, properties={"THE-GROUP": "B"})
snd.send(msg)
# Queue: [A2, (uncommitted: B3) ]
# B3 has yet to be committed, so C2 should see nothing available:
try:
mX = c2.fetch(0)
assert False # should not get here
except Empty:
pass
# but A2 should be available to C1
m3 = c1.fetch(0)
assert m3.properties['THE-GROUP'] == 'A'
assert m3.content['index'] == 2
# now make B3 available
snd.session.commit()
# C1 should still be done:
try:
mX = c1.fetch(0)
assert False # should not get here
except Empty:
pass
# but C2 should find the new B
m4 = c2.fetch(0)
assert m4.properties['THE-GROUP'] == 'B'
assert m4.content['index'] == 3
# extra: have C1 rollback, verify C2 finds the released 'A' messages
c1.session.rollback()
## Q: ["A0","A2"]
# C2 should be able to get the next A
m5 = c2.fetch(0)
assert m5.properties['THE-GROUP'] == 'A'
assert m5.content['index'] == 0
m6 = c2.fetch(0)
assert m6.properties['THE-GROUP'] == 'A'
assert m6.content['index'] == 2
c2.session.acknowledge()
c2.session.commit()