qpid_tests/broker_0_10/stats.py (380 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from __future__ import absolute_import from __future__ import print_function from qpid.tests.messaging.implementation import * from qpid.tests.messaging import Base from time import sleep from qpidtoollibs.broker import BrokerAgent # # Tests the Broker's statistics reporting # class BrokerStatsTests(Base): """ Tests of the broker's statistics """ def assertEqual(self, left, right, text=None): if not left == right: print("assertEqual failure: %r != %r" % (left, right)) if text: print(" %r" % text) assert None def failUnless(self, value, text=None): if value: return print("failUnless failure", end=' ') if text: print(": %r" % text) else: print() assert None def fail(self, text=None): if text: print("Fail: %r" % text) assert None def setup_connection(self): return Connection.establish(self.broker, **self.connection_options()) def setup_session(self, tx=False): return self.conn.session(transactional=tx) def setup_access(self): return BrokerAgent(self.conn) def test_exchange_stats(self): agent = self.setup_access() start_broker = agent.getBroker() agent.addExchange("direct", "stats-test-exchange") try: sess = self.setup_session() tx_a = sess.sender("stats-test-exchange/a") tx_b = sess.sender("stats-test-exchange/b") rx_a = sess.receiver("stats-test-exchange/a") exchange = agent.getExchange("stats-test-exchange") self.failUnless(exchange, "expected a valid exchange object") self.assertEqual(exchange.msgReceives, 0, "msgReceives") self.assertEqual(exchange.msgDrops, 0, "msgDrops") self.assertEqual(exchange.msgRoutes, 0, "msgRoutes") self.assertEqual(exchange.byteReceives, 0, "byteReceives") self.assertEqual(exchange.byteDrops, 0, "byteDrops") self.assertEqual(exchange.byteRoutes, 0, "byteRoutes") tx_a.send("0123456789") tx_b.send("01234567890123456789") tx_a.send("012345678901234567890123456789") tx_b.send("0123456789012345678901234567890123456789") overhead = 63 #overhead added to message from headers exchange.update() self.assertEqual(exchange.msgReceives, 4, "msgReceives") self.assertEqual(exchange.msgDrops, 2, "msgDrops") self.assertEqual(exchange.msgRoutes, 2, "msgRoutes") self.assertEqual(exchange.byteReceives, 100+(4*overhead), "byteReceives") self.assertEqual(exchange.byteDrops, 60+(2*overhead), "byteDrops") self.assertEqual(exchange.byteRoutes, 40+(2*overhead), "byteRoutes") finally: agent.delExchange("stats-test-exchange") def test_enqueues_dequeues(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session() tx = sess.sender("enqueue_test;{create:always,delete:always}") rx = sess.receiver("enqueue_test") queue = agent.getQueue("enqueue_test") self.failUnless(queue, "expected a valid queue object") self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues") self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues") self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues") self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues") self.assertEqual(queue.msgDepth, 0, "msgDepth") self.assertEqual(queue.byteDepth, 0, "byteDepth") tx.send("0123456789") tx.send("01234567890123456789") tx.send("012345678901234567890123456789") tx.send("0123456789012345678901234567890123456789") overhead = 38 #overhead added to message from headers queue.update() self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues") self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues") self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues") self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues") self.assertEqual(queue.msgDepth, 4, "msgDepth") self.assertEqual(queue.byteDepth, 100+(4*overhead), "byteDepth") now_broker = agent.getBroker() self.failUnless((now_broker.msgTotalEnqueues - start_broker.msgTotalEnqueues) >= 4, "broker msgTotalEnqueues") self.failUnless((now_broker.byteTotalEnqueues - start_broker.byteTotalEnqueues) >= 100, "broker byteTotalEnqueues") m = rx.fetch() m = rx.fetch() sess.acknowledge() queue.update() self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues") self.assertEqual(queue.byteTotalEnqueues, 100+(4*overhead), "byteTotalEnqueues") self.assertEqual(queue.msgTotalDequeues, 2, "msgTotalDequeues") self.assertEqual(queue.byteTotalDequeues, 30+(2*overhead), "byteTotalDequeues") self.assertEqual(queue.msgDepth, 2, "msgDepth") self.assertEqual(queue.byteDepth, 70+(2*overhead), "byteDepth") now_broker = agent.getBroker() self.failUnless((now_broker.msgTotalDequeues - start_broker.msgTotalDequeues) >= 2, "broker msgTotalDequeues") self.failUnless((now_broker.byteTotalDequeues - start_broker.byteTotalDequeues) >= 30, "broker byteTotalDequeues") sess.close() now_broker = agent.getBroker() self.assertEqual(now_broker.abandoned - start_broker.abandoned, 2, "expect 2 abandoned messages") self.assertEqual(now_broker.msgDepth, start_broker.msgDepth, "expect broker message depth to be unchanged") self.assertEqual(now_broker.byteDepth, start_broker.byteDepth, "expect broker byte depth to be unchanged") def test_transactional_enqueues_dequeues(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session(True) tx = sess.sender("tx_enqueue_test;{create:always,delete:always}") tx.send("0123456789") tx.send("0123456789") tx.send("0123456789") tx.send("0123456789") overhead = 41 #overhead added to message from headers queue = agent.getQueue("tx_enqueue_test") self.failUnless(queue, "expected a valid queue object") self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues pre-tx-commit") self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues pre-tx-commit") self.assertEqual(queue.msgTxnEnqueues, 0, "msgTxnEnqueues pre-tx-commit") self.assertEqual(queue.byteTxnEnqueues, 0, "byteTxnEnqueues pre-tx-commit") self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-tx-commit") self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-tx-commit") self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-tx-commit") self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-tx-commit") sess.commit() queue.update() self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-tx-commit") self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues post-tx-commit") self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-tx-commit") self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues post-tx-commit") self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues post-tx-commit") self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues post-tx-commit") self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues post-tx-commit") self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues post-tx-commit") sess2 = self.setup_session(True) rx = sess2.receiver("tx_enqueue_test") m = rx.fetch() m = rx.fetch() m = rx.fetch() m = rx.fetch() queue.update() self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues pre-rx-commit") self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues pre-rx-commit") self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues pre-rx-commit") self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues pre-rx-commit") self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-rx-commit") self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-rx-commit") self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-rx-commit") self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-rx-commit") sess2.acknowledge() sess2.commit() queue.update() self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-rx-commit") self.assertEqual(queue.byteTotalEnqueues, 40+(4*overhead), "byteTotalEnqueues post-rx-commit") self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-rx-commit") self.assertEqual(queue.byteTxnEnqueues, 40+(4*overhead), "byteTxnEnqueues post-rx-commit") self.assertEqual(queue.msgTotalDequeues, 4, "msgTotalDequeues post-rx-commit") self.assertEqual(queue.byteTotalDequeues, 40+(4*overhead), "byteTotalDequeues post-rx-commit") self.assertEqual(queue.msgTxnDequeues, 4, "msgTxnDequeues post-rx-commit") self.assertEqual(queue.byteTxnDequeues, 40+(4*overhead), "byteTxnDequeues post-rx-commit") sess.close() sess2.close() now_broker = agent.getBroker() self.assertEqual(now_broker.msgTxnEnqueues - start_broker.msgTxnEnqueues, 4, "broker msgTxnEnqueues") self.assertEqual(now_broker.byteTxnEnqueues - start_broker.byteTxnEnqueues, 40+(4*overhead), "broker byteTxnEnqueues") self.assertEqual(now_broker.msgTxnDequeues - start_broker.msgTxnDequeues, 4, "broker msgTxnDequeues") self.assertEqual(now_broker.byteTxnDequeues - start_broker.byteTxnDequeues, 40+(4*overhead), "broker byteTxnDequeues") def test_discards_no_route(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session() tx = sess.sender("amq.topic/non.existing.key") tx.send("NO_ROUTE") tx.send("NO_ROUTE") tx.send("NO_ROUTE") tx.send("NO_ROUTE") tx.send("NO_ROUTE") now_broker = agent.getBroker() self.failUnless((now_broker.discardsNoRoute - start_broker.discardsNoRoute) >= 5, "Expect at least 5 no-routes") sess.close() def test_abandoned_alt(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session() tx = sess.sender("abandon_alt;{create:always,delete:always,node:{x-declare:{alternate-exchange:'amq.fanout'}}}") rx = sess.receiver("abandon_alt") rx.capacity = 2 tx.send("ABANDON_ALT") tx.send("ABANDON_ALT") tx.send("ABANDON_ALT") tx.send("ABANDON_ALT") tx.send("ABANDON_ALT") rx.fetch() sess.close() now_broker = agent.getBroker() self.assertEqual(now_broker.abandonedViaAlt - start_broker.abandonedViaAlt, 5, "Expect 5 abandonedViaAlt") self.assertEqual(now_broker.abandoned - start_broker.abandoned, 0, "Expect 0 abandoned") def test_discards_ttl(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session() tx = sess.sender("discards_ttl;{create:always,delete:always}") msg = Message("TTL") msg.ttl = 1 tx.send(msg) tx.send(msg) tx.send(msg) tx.send(msg) tx.send(msg) tx.send(msg) sleep(2) rx = sess.receiver("discards_ttl") try: rx.fetch(0) except: pass now_broker = agent.getBroker() queue = agent.getQueue("discards_ttl") self.failUnless(queue, "expected a valid queue object") self.assertEqual(queue.discardsTtl, 6, "expect 6 TTL discards on queue") self.assertEqual(now_broker.discardsTtl - start_broker.discardsTtl, 6, "expect 6 TTL discards on broker") self.assertEqual(queue.msgTotalDequeues, 6, "expect 6 total dequeues on queue") sess.close() def test_discards_limit_overflow(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session() tx = sess.sender("discards_limit;{create:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0}}}}") tx.send("LIMIT") tx.send("LIMIT") tx.send("LIMIT") try: tx.send("LIMIT") self.fail("expected to fail sending 4th message") except: pass now_broker = agent.getBroker() queue = agent.getQueue("discards_limit") self.failUnless(queue, "expected a valid queue object") self.assertEqual(queue.discardsOverflow, 1, "expect 1 overflow discard on queue") self.assertEqual(now_broker.discardsOverflow - start_broker.discardsOverflow, 1, "expect 1 overflow discard on broker") ## ## Shut down and restart the connection to clear the error condition. ## try: self.conn.close(timeout=.1) except: pass self.conn = self.setup_connection() ## ## Re-create the session to delete the queue. ## sess = self.setup_session() tx = sess.sender("discards_limit;{create:always,delete:always}") sess.close() def test_discards_ring_overflow(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session() tx = sess.sender("discards_ring;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.policy_type':ring}}}}") tx.send("RING") tx.send("RING") tx.send("RING") tx.send("RING") tx.send("RING") now_broker = agent.getBroker() queue = agent.getQueue("discards_ring") self.failUnless(queue, "expected a valid queue object") self.assertEqual(queue.discardsRing, 2, "expect 2 ring discards on queue") self.assertEqual(now_broker.discardsRing - start_broker.discardsRing, 2, "expect 2 ring discards on broker") self.assertEqual(queue.msgTotalDequeues, 2, "expect 2 total dequeues on queue") sess.close() def test_discards_lvq_replace(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session() tx = sess.sender("discards_lvq;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.last_value_queue_key':key}}}}") msgA = Message("LVQ_A") msgA.properties['key'] = 'AAA' msgB = Message("LVQ_B") msgB.properties['key'] = 'BBB' tx.send(msgA) tx.send(msgB) tx.send(msgA) tx.send(msgA) tx.send(msgB) now_broker = agent.getBroker() queue = agent.getQueue("discards_lvq") self.failUnless(queue, "expected a valid queue object") self.assertEqual(queue.discardsLvq, 3, "expect 3 lvq discards on queue") self.assertEqual(now_broker.discardsLvq - start_broker.discardsLvq, 3, "expect 3 lvq discards on broker") self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue") sess.close() def test_discards_reject(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session() tx = sess.sender("discards_reject;{create:always,delete:always}") tx.send("REJECT") tx.send("REJECT") tx.send("REJECT") rx = sess.receiver("discards_reject") m = rx.fetch() sess.acknowledge() m1 = rx.fetch() m2 = rx.fetch() sess.acknowledge(m1, Disposition(REJECTED)) sess.acknowledge(m2, Disposition(REJECTED)) now_broker = agent.getBroker() queue = agent.getQueue("discards_reject") self.failUnless(queue, "expected a valid queue object") self.assertEqual(queue.discardsSubscriber, 2, "expect 2 reject discards on queue") self.assertEqual(now_broker.discardsSubscriber - start_broker.discardsSubscriber, 2, "expect 2 reject discards on broker") self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue") sess.close() def test_message_release(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session() tx = sess.sender("message_release;{create:always,delete:always}") tx.send("RELEASE") tx.send("RELEASE") tx.send("RELEASE") tx.send("RELEASE") tx.send("RELEASE") rx = sess.receiver("message_release") m1 = rx.fetch() m2 = rx.fetch() sess.acknowledge(m1, Disposition(RELEASED)) sess.acknowledge(m2, Disposition(RELEASED)) now_broker = agent.getBroker() queue = agent.getQueue("message_release") self.failUnless(queue, "expected a valid queue object") self.assertEqual(queue.acquires, 2, "expect 2 acquires on queue") self.failUnless(now_broker.acquires - start_broker.acquires >= 2, "expect at least 2 acquires on broker") self.assertEqual(queue.msgTotalDequeues, 0, "expect 0 total dequeues on queue") self.assertEqual(queue.releases, 2, "expect 2 releases on queue") self.failUnless(now_broker.releases - start_broker.releases >= 2, "expect at least 2 releases on broker") sess.close() def test_discards_purge(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session() tx = sess.sender("discards_purge;{create:always,delete:always}") tx.send("PURGE") tx.send("PURGE") tx.send("PURGE") tx.send("PURGE") tx.send("PURGE") queue = agent.getQueue("discards_purge") self.failUnless(queue, "expected a valid queue object") queue.purge(3) queue.update() now_broker = agent.getBroker() self.assertEqual(queue.discardsPurge, 3, "expect 3 purge discards on queue") self.assertEqual(now_broker.discardsPurge - start_broker.discardsPurge, 3, "expect 3 purge discards on broker") self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue") sess.close() def test_reroutes(self): agent = self.setup_access() start_broker = agent.getBroker() sess = self.setup_session() tx = sess.sender("reroute;{create:always,delete:always}") tx.send("REROUTE") tx.send("REROUTE") tx.send("REROUTE") tx.send("REROUTE") tx.send("REROUTE") tx.send("REROUTE") tx.send("REROUTE") tx.send("REROUTE") queue = agent.getQueue("reroute") self.failUnless(queue, "expected a valid queue object") queue.reroute(5, False, 'amq.fanout') queue.update() now_broker = agent.getBroker() self.assertEqual(queue.reroutes, 5, "expect 5 reroutes on queue") self.assertEqual(now_broker.reroutes - start_broker.reroutes, 5, "expect 5 reroutes on broker") self.assertEqual(queue.msgTotalDequeues, 5, "expect 5 total dequeues on queue") sess.close()