def test_send_transaction()

in qpid_tests/broker_0_10/msg_groups.py [0:0]


    def test_send_transaction(self):
        """ Verify behavior when sender is using transactions.
        """
        ssn = self.conn.session(transactional=True)
        snd = ssn.sender("msg-group-q; {create:always, delete:sender," +
                         " node: {x-declare: {arguments:" +
                         " {'qpid.group_header_key':'THE-GROUP'," +
                         "'qpid.shared_msg_group':1}}}}")

        msg = Message(content={'index':0}, properties={"THE-GROUP": "A"})
        snd.send(msg)
        msg = Message(content={'index':1}, properties={"THE-GROUP": "B"})
        snd.send(msg)
        snd.session.commit()
        msg = Message(content={'index':2}, properties={"THE-GROUP": "A"})
        snd.send(msg)

        # Queue: [A0,B1, (uncommitted: A2) ]

        s1 = self.conn.session(transactional=True)
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.conn.session(transactional=True)
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        # C1 gets A0, group A
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # C2 gets B2, group B
        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 1

        # Since A2 uncommitted, there should be nothing left to fetch
        try:
            mX = c1.fetch(0)
            assert False    # should not get here
        except Empty:
            pass
        try:
            mX = c2.fetch(0)
            assert False    # should not get here
        except Empty:
            pass

        snd.session.commit()
        msg = Message(content={'index':3}, properties={"THE-GROUP": "B"})
        snd.send(msg)

        # Queue: [A2, (uncommitted: B3) ]

        # B3 has yet to be committed, so C2 should see nothing available:
        try:
            mX = c2.fetch(0)
            assert False    # should not get here
        except Empty:
            pass

        # but A2 should be available to C1
        m3 = c1.fetch(0)
        assert m3.properties['THE-GROUP'] == 'A'
        assert m3.content['index'] == 2

        # now make B3 available
        snd.session.commit()

        # C1 should still be done:
        try:
            mX = c1.fetch(0)
            assert False    # should not get here
        except Empty:
            pass

        # but C2 should find the new B
        m4 = c2.fetch(0)
        assert m4.properties['THE-GROUP'] == 'B'
        assert m4.content['index'] == 3

        # extra: have C1 rollback, verify C2 finds the released 'A' messages
        c1.session.rollback()

        ## Q: ["A0","A2"]

        # C2 should be able to get the next A
        m5 = c2.fetch(0)
        assert m5.properties['THE-GROUP'] == 'A'
        assert m5.content['index'] == 0

        m6 = c2.fetch(0)
        assert m6.properties['THE-GROUP'] == 'A'
        assert m6.content['index'] == 2

        c2.session.acknowledge()
        c2.session.commit()