in qpid_tests/broker_0_10/msg_groups.py [0:0]
def test_reroute(self):
""" Verify we can reroute messages from an acquired group.
"""
snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
" node: {x-declare: {arguments:" +
" {'qpid.group_header_key':'THE-GROUP'," +
"'qpid.shared_msg_group':1}}}}")
groups = ["A","B","A","B","C","A"]
messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
index = 0
for m in messages:
m.content['index'] = index
index += 1
snd.send(m)
# create a topic exchange for the reroute
rcvr = self.ssn.receiver("reroute-q; {create: always, delete:receiver," +
" node: {type: topic}}")
# acquire group "A"
s1 = self.setup_session()
c1 = s1.receiver("msg-group-q", options={"capacity":0})
m1 = c1.fetch(0)
assert m1.properties['THE-GROUP'] == 'A'
assert m1.content['index'] == 0
# now setup a QMF session, so we can reroute group A
self.qmf_session = qmf.console.Session()
self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
assert queue
msg_filter = { 'filter_type' : 'header_match_str',
'filter_params' : { 'header_key' : "THE-GROUP",
'header_value' : "A" }}
assert queue.msgDepth == 6
rc = queue.reroute(0, False, "reroute-q", msg_filter)
assert rc.status == 0
queue.update()
queue.msgDepth == 4 # the pending acquired A still counts!
# verify all other A's removed....
s2 = self.setup_session()
b1 = s2.receiver("msg-group-q", options={"capacity":0})
count = 0
try:
while True:
m2 = b1.fetch(0)
assert m2.properties['THE-GROUP'] != 'A'
count += 1
except Empty:
pass
assert count == 3 # only 3 really available
# and what of reroute-q?
count = 0
try:
while True:
m2 = rcvr.fetch(0)
assert m2.properties['THE-GROUP'] == 'A'
assert m2.content['index'] == 2 or m2.content['index'] == 5
count += 1
except Empty:
pass
assert count == 2
s1.acknowledge() # ack the consumed A-0
self.qmf_session.delBroker(self.qmf_broker)