def test_transaction()

in qpid_tests/broker_0_10/msg_groups.py [0:0]


    def test_transaction(self):
        """ Verify behavior when using transactions.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","A","B","B","A","B"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        s1 = self.conn.session(transactional=True)
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.conn.session(transactional=True)
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        # C1 gets group A
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # C2 gets group B
        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 2

        s1.acknowledge(m1)  # A-0 consumed, A group freed
        s2.acknowledge(m2)  # B-2 consumed, B group freed

        s1.commit()    # A-0 consumption done, A group now free
        s2.rollback()  # releases B-2, and group B

        ## Q: ["A1","B2","B3","A4","B5"]

        # C2 should be able to get the next A
        m3 = c2.fetch(0)
        assert m3.properties['THE-GROUP'] == 'A'
        assert m3.content['index'] == 1

        # C1 should be able to get B-2
        m4 = c1.fetch(0)
        assert m4.properties['THE-GROUP'] == 'B'
        assert m4.content['index'] == 2

        s2.acknowledge(m3)  # C2 consumes A-1
        s1.acknowledge(m4)  # C1 consumes B-2
        s1.commit()    # C1 consume B-2 occurs, free group B

        ## Q: [["A1",]"B3","A4","B5"]

        # A-1 is still considered owned by C2, since the commit has yet to
        # occur, so the next available to C1 would be B-3
        m5 = c1.fetch(0)   # B-3
        assert m5.properties['THE-GROUP'] == 'B'
        assert m5.content['index'] == 3

        # and C2 should find A-4 available, since it owns the A group
        m6 = c2.fetch(0)  # A-4
        assert m6.properties['THE-GROUP'] == 'A'
        assert m6.content['index'] == 4

        s2.acknowledge(m6)  # C2 consumes A-4

        # uh-oh, A-1 and A-4 released, along with A group
        s2.rollback()

        ## Q: ["A1",["B3"],"A4","B5"]
        m7 = c1.fetch(0)   # A-1 is found
        assert m7.properties['THE-GROUP'] == 'A'
        assert m7.content['index'] == 1

        ## Q: [["A1"],["B3"],"A4","B5"]
        # since C1 "owns" both A and B group, C2 should find nothing available
        try:
            m8 = c2.fetch(0)
            assert False    # should not get here
        except Empty:
            pass

        # C1 next gets A4
        m9 = c1.fetch(0)
        assert m9.properties['THE-GROUP'] == 'A'
        assert m9.content['index'] == 4

        s1.acknowledge()

        ## Q: [["A1"],["B3"],["A4"],"B5"]
        # even though C1 acknowledges A1,B3, and A4, B5 is still considered
        # owned as the commit has yet to take place
        try:
            m10 = c2.fetch(0)
            assert False    # should not get here
        except Empty:
            pass

        # now A1,B3,A4 dequeued, B5 should be free
        s1.commit()

        ## Q: ["B5"]
        m11 = c2.fetch(0)
        assert m11.properties['THE-GROUP'] == 'B'
        assert m11.content['index'] == 5

        s2.acknowledge()
        s2.commit()