def test_purge_count()

in qpid_tests/broker_0_10/msg_groups.py [0:0]


    def test_purge_count(self):
        """ Verify we can purge a fixed number of messages from an acquired
        group.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","A","B","C","A"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        # acquire group "A"
        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # now setup a QMF session, so we can purge group A
        self.qmf_session = qmf.console.Session()
        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
        queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
        assert queue
        msg_filter = { 'filter_type' : 'header_match_str',
                       'filter_params' : { 'header_key' : "THE-GROUP",
                                           'header_value' : "A" }}
        assert queue.msgDepth == 6
        rc = queue.purge(1, msg_filter)
        assert rc.status == 0
        queue.update()
        queue.msgDepth == 5   # the pending acquired A still counts!

        # verify all other A's removed....
        s2 = self.setup_session()
        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
        count = 0
        a_count = 0
        try:
            while True:
                m2 = b1.fetch(0)
                if m2.properties['THE-GROUP'] != 'A':
                    count += 1
                else:
                    a_count += 1
        except Empty:
            pass
        assert count == 3   # non-A's
        assert a_count == 1 # assumes the acquired message was not the one purged and regular browsers don't get acquired messages
        s1.acknowledge()    # ack the consumed A-0
        self.qmf_session.delBroker(self.qmf_broker)