def test_move_count()

in qpid_tests/broker_0_10/msg_groups.py [0:0]


    def test_move_count(self):
        """ Verify we can move a fixed number of messages from an acquired group.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","A","B","C","A"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        # set up destination queue
        rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
                                 " node: {x-declare: {arguments:" +
                                 " {'qpid.group_header_key':'THE-GROUP'," +
                                 "'qpid.shared_msg_group':1}}}}")

        # now setup a QMF session, so we can move group B
        self.qmf_session = qmf.console.Session()
        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
        brokers = self.qmf_session.getObjects(_class="broker")
        assert len(brokers) == 1
        broker = brokers[0]
        msg_filter = { 'filter_type' : 'header_match_str',
                       'filter_params' : { 'header_key' : "THE-GROUP",
                                           'header_value' : "B" }}
        rc = broker.queueMoveMessages("msg-group-q", "dest-q", 3, msg_filter)
        assert rc.status == 0

        # verify all B's removed from msg-group-q
        s2 = self.setup_session()
        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
        count = 0
        try:
            while True:
                m2 = b1.fetch(0)
                assert m2.properties['THE-GROUP'] != 'B'
                count += 1
        except Empty:
            pass
        assert count == 4

        # verify the moved B's are at the dest-q
        s2 = self.setup_session()
        b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
        count = 0
        try:
            while True:
                m2 = b1.fetch(0)
                assert m2.properties['THE-GROUP'] == 'B'
                assert m2.content['index'] == 1 or m2.content['index'] == 3
                count += 1
        except Empty:
            pass
        assert count == 2

        self.qmf_session.delBroker(self.qmf_broker)