def test_close()

in qpid_tests/broker_0_10/msg_groups.py [0:0]


    def test_close(self):
        """ Verify behavior when a consumer that 'owns' a group closes.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","A","B","B"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.setup_session()
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        # C1 will own group A
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # C2 will own group B
        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 2

        # C1 shuffles off the mortal coil...
        c1.close()

        # but the session (s1) remains active, so "A" remains blocked
        # from c2, c2 should fetch the next B-3

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 3

        # and there should be no more messages available for C2
        try:
            m2 = c2.fetch(0)
            assert False     # should never get here
        except Empty:
            pass

        # close session s1, releasing the A group
        s1.close()

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'A'
        assert m2.content['index'] == 0

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'A'
        assert m2.content['index'] == 1

        # and there should be no more messages now
        try:
            m2 = c2.fetch(0)
            assert False     # should never get here
        except Empty:
            pass