in qpid_tests/broker_0_10/msg_groups.py [0:0]
def test_purge_acquired(self):
""" Verify we can purge messages from an acquired group.
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
" {'qpid.group_header_key':'THE-GROUP'," +
"'qpid.shared_msg_group':1}}}}")
groups = ["A","B","A","B","C","A"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
index = 0
for m in messages:
m.content['index'] = index
index += 1
snd.send(m)
# acquire group "A"
s1 = self.setup_session()
c1 = s1.receiver("msg-group-q", options={"capacity":0})
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 0
# now setup a QMF session, so we can purge group A
self.qmf_session = qmf.console.Session()
self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
assert queue
msg_filter = { 'filter_type' : 'header_match_str',
'filter_params' : { 'header_key' : "THE-GROUP",
'header_value' : "A" }}
assert queue.msgDepth == 6
rc = queue.purge(0, msg_filter)
assert rc.status == 0
queue.update()
queue.msgDepth == 4 # the pending acquired A still counts!
s1.acknowledge()
# verify all other A's removed....
s2 = self.setup_session()
b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
count = 0
try:
while True:
m2 = b1.fetch(0)
assert m2.properties['THE-GROUP'] != 'A'
count += 1
except Empty:
pass
assert count == 3 # only 3 really available
s1.acknowledge() # ack the consumed A-0
self.qmf_session.delBroker(self.qmf_broker)