in qpid_tests/broker_0_10/msg_groups.py [0:0]
def test_ttl_expire(self):
""" Verify that expired (TTL) group messages are skipped correctly
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
" {'qpid.group_header_key':'THE-GROUP'," +
"'qpid.shared_msg_group':1}}}}")
groups = ["A","B","C","A","B","C"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
index = 0
for m in messages:
m.content['index'] = index
index += 1
if m.properties['THE-GROUP'] == 'B':
m.ttl = 1
snd.send(m)
sleep(2) # let all B's expire
# create consumers on separate sessions: C1,C2
s1 = self.setup_session()
c1 = s1.receiver("msg-group-q", options={"capacity":0})
s2 = self.setup_session()
c2 = s2.receiver("msg-group-q", options={"capacity":0})
# C1 should acquire A-0, then C2 should acquire C-2, Group B should
# expire and never be fetched
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 0
m2 = c2.fetch(0)
assert m2.properties['THE-GROUP'] == 'C'
assert m2.content['index'] == 2
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 3
m2 = c2.fetch(0)
assert m2.properties['THE-GROUP'] == 'C'
assert m2.content['index'] == 5
# there should be no more left for either consumer
try:
mx = c1.fetch(0)
assert False # should never get here
except Empty:
pass
try:
mx = c2.fetch(0)
assert False # should never get here
except Empty:
pass
c1.session.acknowledge()
c2.session.acknowledge()
c1.close()
c2.close()
snd.close()