def test_simple()

in qpid_tests/broker_0_10/msg_groups.py [0:0]


    def test_simple(self):
        """ Verify simple acquire/accept actions on a set of grouped
        messages shared between two receivers.
        """
        ## Create a msg group queue

        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","A","A","B","B","B","C","C","C"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        ## Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
        ## Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,

        # create consumers on separate sessions: C1,C2
        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.setup_session()
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        # C1 should acquire A-0, then C2 should acquire B-3

        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 3

        # C1 Acknowledge A-0
        c1.session.acknowledge(m1)

        # C2 should next acquire A-1
        m3 = c2.fetch(0)
        assert m3.properties['THE-GROUP'] == 'A'
        assert m3.content['index'] == 1

        # C1 should next acquire C-6, since groups A&B are held by c2
        m4 = c1.fetch(0)
        assert m4.properties['THE-GROUP'] == 'C'
        assert m4.content['index'] == 6

        ## Queue = XXX, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
        ## Owners= ---, ^C2, +C2, ^C2, +C2, +C2, ^C1, +C1, +C1,

        # C2 Acknowledge B-3, freeing up the rest of B group
        c2.session.acknowledge(m2)

        ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
        ## Owners= ---, ^C2, +C2, ---, ---, ---, ^C1, +C1, +C1,

        # C1 should now acquire B-4, since it is next "free"
        m5 = c1.fetch(0)
        assert m5.properties['THE-GROUP'] == 'B'
        assert m5.content['index'] == 4

        ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
        ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ^C1, +C1, +C1,

        # C1 acknowledges C-6, freeing the C group
        c1.session.acknowledge(m4)

        ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
        ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ---, ---, ---

        # C2 should next fetch A-2, followed by C-7
        m7 = c2.fetch(0)
        assert m7.properties['THE-GROUP'] == 'A'
        assert m7.content['index'] == 2

        m8 = c2.fetch(0)
        assert m8.properties['THE-GROUP'] == 'C'
        assert m8.content['index'] == 7

        ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
        ## Owners= ---, ^C2, ^C2, ---, ^C1, +C1, ---, ^C2, +C2

        # have C2 ack all fetched messages, freeing C-8
        c2.session.acknowledge()

        ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
        ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ---

        # the next fetch of C2 would get C-8, since B-5 is "owned"
        m9 = c2.fetch(0)
        assert m9.properties['THE-GROUP'] == 'C'
        assert m9.content['index'] == 8

        ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
        ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ^C2

        # C1 acks B-4, freeing B-5 for consumption
        c1.session.acknowledge(m5)

        ## Queue = XXX, XXX, XXX, XXX, XXX, b-5, XXX, XXX, c-8...
        ## Owners= ---, ---, ---, ---, ---, ^C2, ---, ---, ^C2

        # the next fetch of C2 would get B-5
        m10 = c2.fetch(0)
        assert m10.properties['THE-GROUP'] == 'B'
        assert m10.content['index'] == 5

        # there should be no more left for C1:
        try:
            mx = c1.fetch(0)
            assert False     # should never get here
        except Empty:
            pass

        c1.session.acknowledge()
        c2.session.acknowledge()
        c1.close()
        c2.close()
        snd.close()