in qpid_tests/broker_0_10/msg_groups.py [0:0]
def test_transaction(self):
""" Verify behavior when using transactions.
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
" {'qpid.group_header_key':'THE-GROUP'," +
"'qpid.shared_msg_group':1}}}}")
groups = ["A","A","B","B","A","B"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
index = 0
for m in messages:
m.content['index'] = index
index += 1
snd.send(m)
s1 = self.conn.session(transactional=True)
c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.conn.session(transactional=True)
c2 = s2.receiver("msg-group-q", options={"capacity":0})
# C1 gets group A
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 0
# C2 gets group B
m2 = c2.fetch(0)
assert m2.properties['THE-GROUP'] == 'B'
assert m2.content['index'] == 2
s1.acknowledge(m1) # A-0 consumed, A group freed
s2.acknowledge(m2) # B-2 consumed, B group freed
s1.commit() # A-0 consumption done, A group now free
s2.rollback() # releases B-2, and group B
## Q: ["A1","B2","B3","A4","B5"]
# C2 should be able to get the next A
m3 = c2.fetch(0)
assert m3.properties['THE-GROUP'] == 'A'
assert m3.content['index'] == 1
# C1 should be able to get B-2
m4 = c1.fetch(0)
assert m4.properties['THE-GROUP'] == 'B'
assert m4.content['index'] == 2
s2.acknowledge(m3) # C2 consumes A-1
s1.acknowledge(m4) # C1 consumes B-2
s1.commit() # C1 consume B-2 occurs, free group B
## Q: [["A1",]"B3","A4","B5"]
# A-1 is still considered owned by C2, since the commit has yet to
# occur, so the next available to C1 would be B-3
m5 = c1.fetch(0) # B-3
assert m5.properties['THE-GROUP'] == 'B'
assert m5.content['index'] == 3
# and C2 should find A-4 available, since it owns the A group
m6 = c2.fetch(0) # A-4
assert m6.properties['THE-GROUP'] == 'A'
assert m6.content['index'] == 4
s2.acknowledge(m6) # C2 consumes A-4
# uh-oh, A-1 and A-4 released, along with A group
s2.rollback()
## Q: ["A1",["B3"],"A4","B5"]
m7 = c1.fetch(0) # A-1 is found
assert m7.properties['THE-GROUP'] == 'A'
assert m7.content['index'] == 1
## Q: [["A1"],["B3"],"A4","B5"]
# since C1 "owns" both A and B group, C2 should find nothing available
try:
m8 = c2.fetch(0)
assert False # should not get here
except Empty:
pass
# C1 next gets A4
m9 = c1.fetch(0)
assert m9.properties['THE-GROUP'] == 'A'
assert m9.content['index'] == 4
s1.acknowledge()
## Q: [["A1"],["B3"],["A4"],"B5"]
# even though C1 acknowledges A1,B3, and A4, B5 is still considered
# owned as the commit has yet to take place
try:
m10 = c2.fetch(0)
assert False # should not get here
except Empty:
pass
# now A1,B3,A4 dequeued, B5 should be free
s1.commit()
## Q: ["B5"]
m11 = c2.fetch(0)
assert m11.properties['THE-GROUP'] == 'B'
assert m11.content['index'] == 5
s2.acknowledge()
s2.commit()