in qpid_tests/broker_0_10/msg_groups.py [0:0]
def test_move_all(self):
""" Verify we can move messages from an acquired group.
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
" {'qpid.group_header_key':'THE-GROUP'," +
"'qpid.shared_msg_group':1}}}}")
groups = ["A","B","A","B","C","A"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
index = 0
for m in messages:
m.content['index'] = index
index += 1
snd.send(m)
# set up destination queue
rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
" node: {x-declare: {arguments:" +
" {'qpid.group_header_key':'THE-GROUP'," +
"'qpid.shared_msg_group':1}}}}")
# acquire group "A"
s1 = self.setup_session()
c1 = s1.receiver("msg-group-q", options={"capacity":0})
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 0
# now setup a QMF session, so we can move what's left of group A
self.qmf_session = qmf.console.Session()
self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
brokers = self.qmf_session.getObjects(_class="broker")
assert len(brokers) == 1
broker = brokers[0]
msg_filter = { 'filter_type' : 'header_match_str',
'filter_params' : { 'header_key' : "THE-GROUP",
'header_value' : "A" }}
rc = broker.queueMoveMessages("msg-group-q", "dest-q", 0, msg_filter)
assert rc.status == 0
# verify all other A's removed from msg-group-q
s2 = self.setup_session()
b1 = s2.receiver("msg-group-q", options={"capacity":0})
count = 0
try:
while True:
m2 = b1.fetch(0)
assert m2.properties['THE-GROUP'] != 'A'
count += 1
except Empty:
pass
assert count == 3 # only 3 really available
# verify the moved A's are at the dest-q
s2 = self.setup_session()
b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
count = 0
try:
while True:
m2 = b1.fetch(0)
assert m2.properties['THE-GROUP'] == 'A'
assert m2.content['index'] == 2 or m2.content['index'] == 5
count += 1
except Empty:
pass
assert count == 2 # two A's moved
s1.acknowledge() # ack the consumed A-0
self.qmf_session.delBroker(self.qmf_broker)