#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import absolute_import
from qpid.messaging import *
from qpid.tests.messaging import Base
import qmf.console

from time import sleep
#
# Tests the Broker's support for message groups
#

class MultiConsumerMsgGroupTests(Base):
    """
    Tests for the behavior of multi-consumer message groups.  These tests allow
    a messages from the same group be consumed by multiple different clients as
    long as each message is processed "in sequence".  See QPID-3346 for
    details.
    """

    def setup_connection(self):
        return Connection.establish(self.broker, **self.connection_options())

    def setup_session(self):
        return self.conn.session()

    def test_simple(self):
        """ Verify simple acquire/accept actions on a set of grouped
        messages shared between two receivers.
        """
        ## Create a msg group queue

        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","A","A","B","B","B","C","C","C"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        ## Queue = a-0, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
        ## Owners= ---, ---, ---, ---, ---, ---, ---, ---, ---,

        # create consumers on separate sessions: C1,C2
        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.setup_session()
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        # C1 should acquire A-0, then C2 should acquire B-3

        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 3

        # C1 Acknowledge A-0
        c1.session.acknowledge(m1)

        # C2 should next acquire A-1
        m3 = c2.fetch(0)
        assert m3.properties['THE-GROUP'] == 'A'
        assert m3.content['index'] == 1

        # C1 should next acquire C-6, since groups A&B are held by c2
        m4 = c1.fetch(0)
        assert m4.properties['THE-GROUP'] == 'C'
        assert m4.content['index'] == 6

        ## Queue = XXX, a-1, a-2, b-3, b-4, b-5, c-6, c-7, c-8...
        ## Owners= ---, ^C2, +C2, ^C2, +C2, +C2, ^C1, +C1, +C1,

        # C2 Acknowledge B-3, freeing up the rest of B group
        c2.session.acknowledge(m2)

        ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
        ## Owners= ---, ^C2, +C2, ---, ---, ---, ^C1, +C1, +C1,

        # C1 should now acquire B-4, since it is next "free"
        m5 = c1.fetch(0)
        assert m5.properties['THE-GROUP'] == 'B'
        assert m5.content['index'] == 4

        ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, c-6, c-7, c-8...
        ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ^C1, +C1, +C1,

        # C1 acknowledges C-6, freeing the C group
        c1.session.acknowledge(m4)

        ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
        ## Owners= ---, ^C2, +C2, ---, ^C1, +C1, ---, ---, ---

        # C2 should next fetch A-2, followed by C-7
        m7 = c2.fetch(0)
        assert m7.properties['THE-GROUP'] == 'A'
        assert m7.content['index'] == 2

        m8 = c2.fetch(0)
        assert m8.properties['THE-GROUP'] == 'C'
        assert m8.content['index'] == 7

        ## Queue = XXX, a-1, a-2, XXX, b-4, b-5, XXX, c-7, c-8...
        ## Owners= ---, ^C2, ^C2, ---, ^C1, +C1, ---, ^C2, +C2

        # have C2 ack all fetched messages, freeing C-8
        c2.session.acknowledge()

        ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
        ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ---

        # the next fetch of C2 would get C-8, since B-5 is "owned"
        m9 = c2.fetch(0)
        assert m9.properties['THE-GROUP'] == 'C'
        assert m9.content['index'] == 8

        ## Queue = XXX, XXX, XXX, XXX, b-4, b-5, XXX, XXX, c-8...
        ## Owners= ---, ---, ---, ---, ^C1, +C1, ---, ---, ^C2

        # C1 acks B-4, freeing B-5 for consumption
        c1.session.acknowledge(m5)

        ## Queue = XXX, XXX, XXX, XXX, XXX, b-5, XXX, XXX, c-8...
        ## Owners= ---, ---, ---, ---, ---, ^C2, ---, ---, ^C2

        # the next fetch of C2 would get B-5
        m10 = c2.fetch(0)
        assert m10.properties['THE-GROUP'] == 'B'
        assert m10.content['index'] == 5

        # there should be no more left for C1:
        try:
            mx = c1.fetch(0)
            assert False     # should never get here
        except Empty:
            pass

        c1.session.acknowledge()
        c2.session.acknowledge()
        c1.close()
        c2.close()
        snd.close()

    def test_simple_browse(self):
        """ Test the behavior of a browsing subscription on a message grouping
        queue.
        """

        ## Create a msg group queue

        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","A","B","C"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        ## Queue = A-0, B-1, A-2, b-3, C-4
        ## Owners= ---, ---, ---, ---, ---

        # create consumer and browser
        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.setup_session()
        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})

        m2 = b1.fetch(0)
        assert m2.properties['THE-GROUP'] == 'A'
        assert m2.content['index'] == 0

        # C1 should acquire A-0

        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        ## Queue = A-0, B-1, A-2, b-3, C-4
        ## Owners= ^C1, ---, +C1, ---, ---

        m2 = b1.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 1

        # verify that the browser may see A-2, even though its group is owned
        # by C1
        m2 = b1.fetch(0)
        assert m2.properties['THE-GROUP'] == 'A'
        assert m2.content['index'] == 2

        m2 = b1.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 3

        # verify the consumer can own groups currently seen by the browser
        m3 = c1.fetch(0)
        assert m3.properties['THE-GROUP'] == 'B'
        assert m3.content['index'] == 1

        m2 = b1.fetch(0)
        assert m2.properties['THE-GROUP'] == 'C'
        assert m2.content['index'] == 4

    def test_release(self):
        """ Verify releasing a message can free its assocated group
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","A","B","B"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.setup_session()
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 2

        # C1 release m1, and the first group

        s1.acknowledge(m1, Disposition(RELEASED, set_redelivered=True))

        # C2 should be able to get group 'A', msg 'A-0' now
        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'A'
        assert m2.content['index'] == 0

    def test_reject(self):
        """ Verify rejecting a message can free its associated group
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","A","B","B"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.setup_session()
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 2

        # C1 rejects m1, and the first group is released
        s1.acknowledge(m1, Disposition(REJECTED))

        # C2 should be able to get group 'A', msg 'A-1' now
        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'A'
        assert m2.content['index'] == 1

    def test_close(self):
        """ Verify behavior when a consumer that 'owns' a group closes.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","A","B","B"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.setup_session()
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        # C1 will own group A
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # C2 will own group B
        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 2

        # C1 shuffles off the mortal coil...
        c1.close()

        # but the session (s1) remains active, so "A" remains blocked
        # from c2, c2 should fetch the next B-3

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 3

        # and there should be no more messages available for C2
        try:
            m2 = c2.fetch(0)
            assert False     # should never get here
        except Empty:
            pass

        # close session s1, releasing the A group
        s1.close()

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'A'
        assert m2.content['index'] == 0

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'A'
        assert m2.content['index'] == 1

        # and there should be no more messages now
        try:
            m2 = c2.fetch(0)
            assert False     # should never get here
        except Empty:
            pass

    def test_transaction(self):
        """ Verify behavior when using transactions.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","A","B","B","A","B"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        s1 = self.conn.session(transactional=True)
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.conn.session(transactional=True)
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        # C1 gets group A
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # C2 gets group B
        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 2

        s1.acknowledge(m1)  # A-0 consumed, A group freed
        s2.acknowledge(m2)  # B-2 consumed, B group freed

        s1.commit()    # A-0 consumption done, A group now free
        s2.rollback()  # releases B-2, and group B

        ## Q: ["A1","B2","B3","A4","B5"]

        # C2 should be able to get the next A
        m3 = c2.fetch(0)
        assert m3.properties['THE-GROUP'] == 'A'
        assert m3.content['index'] == 1

        # C1 should be able to get B-2
        m4 = c1.fetch(0)
        assert m4.properties['THE-GROUP'] == 'B'
        assert m4.content['index'] == 2

        s2.acknowledge(m3)  # C2 consumes A-1
        s1.acknowledge(m4)  # C1 consumes B-2
        s1.commit()    # C1 consume B-2 occurs, free group B

        ## Q: [["A1",]"B3","A4","B5"]

        # A-1 is still considered owned by C2, since the commit has yet to
        # occur, so the next available to C1 would be B-3
        m5 = c1.fetch(0)   # B-3
        assert m5.properties['THE-GROUP'] == 'B'
        assert m5.content['index'] == 3

        # and C2 should find A-4 available, since it owns the A group
        m6 = c2.fetch(0)  # A-4
        assert m6.properties['THE-GROUP'] == 'A'
        assert m6.content['index'] == 4

        s2.acknowledge(m6)  # C2 consumes A-4

        # uh-oh, A-1 and A-4 released, along with A group
        s2.rollback()

        ## Q: ["A1",["B3"],"A4","B5"]
        m7 = c1.fetch(0)   # A-1 is found
        assert m7.properties['THE-GROUP'] == 'A'
        assert m7.content['index'] == 1

        ## Q: [["A1"],["B3"],"A4","B5"]
        # since C1 "owns" both A and B group, C2 should find nothing available
        try:
            m8 = c2.fetch(0)
            assert False    # should not get here
        except Empty:
            pass

        # C1 next gets A4
        m9 = c1.fetch(0)
        assert m9.properties['THE-GROUP'] == 'A'
        assert m9.content['index'] == 4

        s1.acknowledge()

        ## Q: [["A1"],["B3"],["A4"],"B5"]
        # even though C1 acknowledges A1,B3, and A4, B5 is still considered
        # owned as the commit has yet to take place
        try:
            m10 = c2.fetch(0)
            assert False    # should not get here
        except Empty:
            pass

        # now A1,B3,A4 dequeued, B5 should be free
        s1.commit()

        ## Q: ["B5"]
        m11 = c2.fetch(0)
        assert m11.properties['THE-GROUP'] == 'B'
        assert m11.content['index'] == 5

        s2.acknowledge()
        s2.commit()

    def test_send_transaction(self):
        """ Verify behavior when sender is using transactions.
        """
        ssn = self.conn.session(transactional=True)
        snd = ssn.sender("msg-group-q; {create:always, delete:sender," +
                         " node: {x-declare: {arguments:" +
                         " {'qpid.group_header_key':'THE-GROUP'," +
                         "'qpid.shared_msg_group':1}}}}")

        msg = Message(content={'index':0}, properties={"THE-GROUP": "A"})
        snd.send(msg)
        msg = Message(content={'index':1}, properties={"THE-GROUP": "B"})
        snd.send(msg)
        snd.session.commit()
        msg = Message(content={'index':2}, properties={"THE-GROUP": "A"})
        snd.send(msg)

        # Queue: [A0,B1, (uncommitted: A2) ]

        s1 = self.conn.session(transactional=True)
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.conn.session(transactional=True)
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        # C1 gets A0, group A
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # C2 gets B2, group B
        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 1

        # Since A2 uncommitted, there should be nothing left to fetch
        try:
            mX = c1.fetch(0)
            assert False    # should not get here
        except Empty:
            pass
        try:
            mX = c2.fetch(0)
            assert False    # should not get here
        except Empty:
            pass

        snd.session.commit()
        msg = Message(content={'index':3}, properties={"THE-GROUP": "B"})
        snd.send(msg)

        # Queue: [A2, (uncommitted: B3) ]

        # B3 has yet to be committed, so C2 should see nothing available:
        try:
            mX = c2.fetch(0)
            assert False    # should not get here
        except Empty:
            pass

        # but A2 should be available to C1
        m3 = c1.fetch(0)
        assert m3.properties['THE-GROUP'] == 'A'
        assert m3.content['index'] == 2

        # now make B3 available
        snd.session.commit()

        # C1 should still be done:
        try:
            mX = c1.fetch(0)
            assert False    # should not get here
        except Empty:
            pass

        # but C2 should find the new B
        m4 = c2.fetch(0)
        assert m4.properties['THE-GROUP'] == 'B'
        assert m4.content['index'] == 3

        # extra: have C1 rollback, verify C2 finds the released 'A' messages
        c1.session.rollback()

        ## Q: ["A0","A2"]

        # C2 should be able to get the next A
        m5 = c2.fetch(0)
        assert m5.properties['THE-GROUP'] == 'A'
        assert m5.content['index'] == 0

        m6 = c2.fetch(0)
        assert m6.properties['THE-GROUP'] == 'A'
        assert m6.content['index'] == 2

        c2.session.acknowledge()
        c2.session.commit()

    def test_query(self):
        """ Verify the queue query method against message groups
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","C","A","B","C","A"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.setup_session()
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        m1 = c1.fetch(0)
        m2 = c2.fetch(0)

        # at this point, group A should be owned by C1, group B by C2, and
        # group C should be available

        # now setup a QMF session, so we can call methods
        self.qmf_session = qmf.console.Session()
        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
        brokers = self.qmf_session.getObjects(_class="broker")
        assert len(brokers) == 1
        broker = brokers[0]

        # verify the query method call's group information
        rc = broker.query("queue", "msg-group-q")
        assert rc.status == 0
        assert rc.text == "OK"
        results = rc.outArgs['results']
        assert 'qpid.message_group_queue' in results
        q_info = results['qpid.message_group_queue']
        assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
        assert 'group_state' in q_info and len(q_info['group_state']) == 3
        for g_info in q_info['group_state']:
            assert 'group_id' in g_info
            if g_info['group_id'] == "A":
                assert g_info['msg_count'] == 3
                assert g_info['consumer'] != ""
            elif g_info['group_id'] == "B":
                assert g_info['msg_count'] == 2
                assert g_info['consumer'] != ""
            elif g_info['group_id'] == "C":
                assert g_info['msg_count'] == 2
                assert g_info['consumer'] == ""
            else:
                assert(False)    # should never get here
        self.qmf_session.delBroker(self.qmf_broker)

    def test_purge_free(self):
        """ Verify we can purge a queue of all messages of a given "unowned"
        group.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," + 
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","A","B","C","A"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        # now setup a QMF session, so we can call methods
        self.qmf_session = qmf.console.Session()
        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
        queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
        assert queue
        msg_filter = { 'filter_type' : 'header_match_str',
                       'filter_params' : { 'header_key' : "THE-GROUP",
                                           'header_value' : "B" }}
        assert queue.msgDepth == 6
        rc = queue.purge(0, msg_filter)
        assert rc.status == 0
        queue.update()
        assert queue.msgDepth == 4

        # verify all B's removed....
        s2 = self.setup_session()
        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
        count = 0
        try:
            while True:
                m2 = b1.fetch(0)
                assert m2.properties['THE-GROUP'] != 'B'
                count += 1
        except Empty:
            pass
        assert count == 4

        self.qmf_session.delBroker(self.qmf_broker)

    def test_purge_acquired(self):
        """ Verify we can purge messages from an acquired group.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","A","B","C","A"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        # acquire group "A"
        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # now setup a QMF session, so we can purge group A
        self.qmf_session = qmf.console.Session()
        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
        queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
        assert queue
        msg_filter = { 'filter_type' : 'header_match_str',
                       'filter_params' : { 'header_key' : "THE-GROUP",
                                           'header_value' : "A" }}
        assert queue.msgDepth == 6
        rc = queue.purge(0, msg_filter)
        assert rc.status == 0
        queue.update()
        queue.msgDepth == 4   # the pending acquired A still counts!
        s1.acknowledge()

        # verify all other A's removed....
        s2 = self.setup_session()
        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
        count = 0
        try:
            while True:
                m2 = b1.fetch(0)
                assert m2.properties['THE-GROUP'] != 'A'
                count += 1
        except Empty:
            pass
        assert count == 3   # only 3 really available
        s1.acknowledge()    # ack the consumed A-0
        self.qmf_session.delBroker(self.qmf_broker)

    def test_purge_count(self):
        """ Verify we can purge a fixed number of messages from an acquired
        group.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","A","B","C","A"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        # acquire group "A"
        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # now setup a QMF session, so we can purge group A
        self.qmf_session = qmf.console.Session()
        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
        queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
        assert queue
        msg_filter = { 'filter_type' : 'header_match_str',
                       'filter_params' : { 'header_key' : "THE-GROUP",
                                           'header_value' : "A" }}
        assert queue.msgDepth == 6
        rc = queue.purge(1, msg_filter)
        assert rc.status == 0
        queue.update()
        queue.msgDepth == 5   # the pending acquired A still counts!

        # verify all other A's removed....
        s2 = self.setup_session()
        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
        count = 0
        a_count = 0
        try:
            while True:
                m2 = b1.fetch(0)
                if m2.properties['THE-GROUP'] != 'A':
                    count += 1
                else:
                    a_count += 1
        except Empty:
            pass
        assert count == 3   # non-A's
        assert a_count == 1 # assumes the acquired message was not the one purged and regular browsers don't get acquired messages
        s1.acknowledge()    # ack the consumed A-0
        self.qmf_session.delBroker(self.qmf_broker)

    def test_move_all(self):
        """ Verify we can move messages from an acquired group.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","A","B","C","A"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        # set up destination queue
        rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
                                 " node: {x-declare: {arguments:" +
                                 " {'qpid.group_header_key':'THE-GROUP'," +
                                 "'qpid.shared_msg_group':1}}}}")

        # acquire group "A"
        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # now setup a QMF session, so we can move what's left of group A
        self.qmf_session = qmf.console.Session()
        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
        brokers = self.qmf_session.getObjects(_class="broker")
        assert len(brokers) == 1
        broker = brokers[0]
        msg_filter = { 'filter_type' : 'header_match_str',
                       'filter_params' : { 'header_key' : "THE-GROUP",
                                           'header_value' : "A" }}
        rc = broker.queueMoveMessages("msg-group-q", "dest-q", 0, msg_filter)
        assert rc.status == 0

        # verify all other A's removed from msg-group-q
        s2 = self.setup_session()
        b1 = s2.receiver("msg-group-q", options={"capacity":0})
        count = 0
        try:
            while True:
                m2 = b1.fetch(0)
                assert m2.properties['THE-GROUP'] != 'A'
                count += 1
        except Empty:
            pass
        assert count == 3   # only 3 really available

        # verify the moved A's are at the dest-q
        s2 = self.setup_session()
        b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
        count = 0
        try:
            while True:
                m2 = b1.fetch(0)
                assert m2.properties['THE-GROUP'] == 'A'
                assert m2.content['index'] == 2 or m2.content['index'] == 5
                count += 1
        except Empty:
            pass
        assert count == 2 # two A's moved

        s1.acknowledge()    # ack the consumed A-0
        self.qmf_session.delBroker(self.qmf_broker)

    def test_move_count(self):
        """ Verify we can move a fixed number of messages from an acquired group.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","A","B","C","A"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        # set up destination queue
        rcvr = self.ssn.receiver("dest-q; {create:always, delete:receiver," +
                                 " node: {x-declare: {arguments:" +
                                 " {'qpid.group_header_key':'THE-GROUP'," +
                                 "'qpid.shared_msg_group':1}}}}")

        # now setup a QMF session, so we can move group B
        self.qmf_session = qmf.console.Session()
        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
        brokers = self.qmf_session.getObjects(_class="broker")
        assert len(brokers) == 1
        broker = brokers[0]
        msg_filter = { 'filter_type' : 'header_match_str',
                       'filter_params' : { 'header_key' : "THE-GROUP",
                                           'header_value' : "B" }}
        rc = broker.queueMoveMessages("msg-group-q", "dest-q", 3, msg_filter)
        assert rc.status == 0

        # verify all B's removed from msg-group-q
        s2 = self.setup_session()
        b1 = s2.receiver("msg-group-q; {mode: browse}", options={"capacity":0})
        count = 0
        try:
            while True:
                m2 = b1.fetch(0)
                assert m2.properties['THE-GROUP'] != 'B'
                count += 1
        except Empty:
            pass
        assert count == 4

        # verify the moved B's are at the dest-q
        s2 = self.setup_session()
        b1 = s2.receiver("dest-q; {mode: browse}", options={"capacity":0})
        count = 0
        try:
            while True:
                m2 = b1.fetch(0)
                assert m2.properties['THE-GROUP'] == 'B'
                assert m2.content['index'] == 1 or m2.content['index'] == 3
                count += 1
        except Empty:
            pass
        assert count == 2

        self.qmf_session.delBroker(self.qmf_broker)

    def test_reroute(self):
        """ Verify we can reroute messages from an acquired group.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","A","B","C","A"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        # create a topic exchange for the reroute
        rcvr = self.ssn.receiver("reroute-q; {create: always, delete:receiver," +
                                 " node: {type: topic}}")

        # acquire group "A"
        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # now setup a QMF session, so we can reroute group A
        self.qmf_session = qmf.console.Session()
        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
        queue = self.qmf_session.getObjects(_class="queue", name="msg-group-q")[0]
        assert queue
        msg_filter = { 'filter_type' : 'header_match_str',
                       'filter_params' : { 'header_key' : "THE-GROUP",
                                           'header_value' : "A" }}
        assert queue.msgDepth == 6
        rc = queue.reroute(0, False, "reroute-q", msg_filter)
        assert rc.status == 0
        queue.update()
        queue.msgDepth == 4   # the pending acquired A still counts!

        # verify all other A's removed....
        s2 = self.setup_session()
        b1 = s2.receiver("msg-group-q", options={"capacity":0})
        count = 0
        try:
            while True:
                m2 = b1.fetch(0)
                assert m2.properties['THE-GROUP'] != 'A'
                count += 1
        except Empty:
            pass
        assert count == 3   # only 3 really available

        # and what of reroute-q?
        count = 0
        try:
            while True:
                m2 = rcvr.fetch(0)
                assert m2.properties['THE-GROUP'] == 'A'
                assert m2.content['index'] == 2 or m2.content['index'] == 5
                count += 1
        except Empty:
            pass
        assert count == 2

        s1.acknowledge()    # ack the consumed A-0
        self.qmf_session.delBroker(self.qmf_broker)

    def test_queue_delete(self):
        """ Test deleting a queue while consumers are active.
        """

        ## Create a msg group queue

        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","A","B","C"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        ## Queue = A-0, B-1, A-2, b-3, C-4
        ## Owners= ---, ---, ---, ---, ---

        # create consumers
        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.setup_session()
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        # C1 should acquire A-0
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        # c2 acquires B-1
        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 1

        # with group A and B owned, and C free, delete the
        # queue
        snd.close()
        self.ssn.close()

    def test_default_group_id(self):
        """ Verify the queue assigns the default group id should a message
        arrive without a group identifier.
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        m = Message(content={}, properties={"NO-GROUP-HEADER":"HA-HA"})
        snd.send(m)

        # now setup a QMF session, so we can call methods
        self.qmf_session = qmf.console.Session()
        self.qmf_broker = self.qmf_session.addBroker(str(self.broker))
        brokers = self.qmf_session.getObjects(_class="broker")
        assert len(brokers) == 1
        broker = brokers[0]

        # grab the group state off the queue, and verify the default group is
        # present ("qpid.no-group" is the broker default)
        rc = broker.query("queue", "msg-group-q")
        assert rc.status == 0
        assert rc.text == "OK"
        results = rc.outArgs['results']
        assert 'qpid.message_group_queue' in results
        q_info = results['qpid.message_group_queue']
        assert 'group_header_key' in q_info and q_info['group_header_key'] == "THE-GROUP"
        assert 'group_state' in q_info and len(q_info['group_state']) == 1
        g_info = q_info['group_state'][0]
        assert 'group_id' in g_info
        assert g_info['group_id'] == 'qpid.no-group'

        self.qmf_session.delBroker(self.qmf_broker)


    def test_transaction_order(self):
        """ Verify that rollback does not reorder the messages with respect to
        the consumer (QPID-3804)
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","A"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            snd.send(m)

        s1 = self.conn.session(transactional=True)
        c1 = s1.receiver("msg-group-q", options={"capacity":0})

        # C1 gets group A
        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0
        s1.acknowledge(m1)

        s1.rollback()  # release A back to the queue

        # the order should be preserved as follows:

        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        m2 = c1.fetch(0)
        assert m2.properties['THE-GROUP'] == 'B'
        assert m2.content['index'] == 1

        m3 = c1.fetch(0)
        assert m3.properties['THE-GROUP'] == 'A'
        assert m3.content['index'] == 2

        s1.commit()

        c1.close()
        s1.close()
        snd.close()


    def test_ttl_expire(self):
        """ Verify that expired (TTL) group messages are skipped correctly
        """
        snd = self.ssn.sender("msg-group-q; {create:always, delete:sender," +
                              " node: {x-declare: {arguments:" +
                              " {'qpid.group_header_key':'THE-GROUP'," +
                              "'qpid.shared_msg_group':1}}}}")

        groups = ["A","B","C","A","B","C"]
        messages = [Message(content={}, properties={"THE-GROUP": g}) for g in groups]
        index = 0
        for m in messages:
            m.content['index'] = index
            index += 1
            if m.properties['THE-GROUP'] == 'B':
                m.ttl = 1
            snd.send(m)

        sleep(2)  # let all B's expire

        # create consumers on separate sessions: C1,C2
        s1 = self.setup_session()
        c1 = s1.receiver("msg-group-q", options={"capacity":0})
        s2 = self.setup_session()
        c2 = s2.receiver("msg-group-q", options={"capacity":0})

        # C1 should acquire A-0, then C2 should acquire C-2, Group B should
        # expire and never be fetched

        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 0

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'C'
        assert m2.content['index'] == 2

        m1 = c1.fetch(0)
        assert m1.properties['THE-GROUP'] == 'A'
        assert m1.content['index'] == 3

        m2 = c2.fetch(0)
        assert m2.properties['THE-GROUP'] == 'C'
        assert m2.content['index'] == 5

        # there should be no more left for either consumer
        try:
            mx = c1.fetch(0)
            assert False     # should never get here
        except Empty:
            pass
        try:
            mx = c2.fetch(0)
            assert False     # should never get here
        except Empty:
            pass

        c1.session.acknowledge()
        c2.session.acknowledge()
        c1.close()
        c2.close()
        snd.close()


class StickyConsumerMsgGroupTests(Base):
    """
    Tests for the behavior of sticky-consumer message groups.  These tests
    expect all messages from the same group be consumed by the same clients.
    See QPID-3347 for details.
    """
    pass  # TBD
