in qpid_tests/broker_0_10/msg_groups.py [0:0]
def test_close(self):
""" Verify behavior when a consumer that 'owns' a group closes.
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
" {'qpid.group_header_key':'THE-GROUP'," +
"'qpid.shared_msg_group':1}}}}")
groups = ["A","A","B","B"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
index = 0
for m in messages:
m.content['index'] = index
index += 1
snd.send(m)
s1 = self.setup_session()
c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.setup_session()
c2 = s2.receiver("msg-group-q", options={"capacity":0})
# C1 will own group A
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 0
# C2 will own group B
m2 = c2.fetch(0)
assert m2.properties['THE-GROUP'] == 'B'
assert m2.content['index'] == 2
# C1 shuffles off the mortal coil...
c1.close()
# but the session (s1) remains active, so "A" remains blocked
# from c2, c2 should fetch the next B-3
m2 = c2.fetch(0)
assert m2.properties['THE-GROUP'] == 'B'
assert m2.content['index'] == 3
# and there should be no more messages available for C2
try:
m2 = c2.fetch(0)
assert False # should never get here
except Empty:
pass
# close session s1, releasing the A group
s1.close()
m2 = c2.fetch(0)
assert m2.properties['THE-GROUP'] == 'A'
assert m2.content['index'] == 0
m2 = c2.fetch(0)
assert m2.properties['THE-GROUP'] == 'A'
assert m2.content['index'] == 1
# and there should be no more messages now
try:
m2 = c2.fetch(0)
assert False # should never get here
except Empty:
pass