in qpid_tests/broker_0_10/threshold.py [0:0]
def test_hysteresis(self):
astr = "'qpid.alert_count_up':10,'qpid.alert_count_down':5"
rcvUp = self.ssn.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdCrossedUpward.#")
rcvDn = self.ssn.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdCrossedDownward.#")
snd = self.ssn.sender("thq; {create:always, node: {x-declare:{auto_delete:True,exclusive:True,arguments:{%s}}}}" % astr)
rcv = self.ssn.receiver("thq")
rcvUp.capacity = 5
rcvDn.capacity = 5
rcv.capacity = 5
self.enqueue(snd, 8) # depth = 8
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 0)
self.dequeue(rcv, 6) # depth = 2
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 0)
self.enqueue(snd, 8) # depth = 10
self.check_events(rcvUp, 1)
self.check_events(rcvDn, 0)
self.dequeue(rcv, 1) # depth = 9
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 0)
self.enqueue(snd, 1) # depth = 10
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 0)
self.enqueue(snd, 10) # depth = 20
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 0)
self.dequeue(rcv, 5) # depth = 15
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 0)
self.dequeue(rcv, 12) # depth = 3
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 1)
self.dequeue(rcv, 1) # depth = 2
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 0)
self.enqueue(snd, 6) # depth = 8
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 0)
self.enqueue(snd, 6) # depth = 14
self.check_events(rcvUp, 1)
self.check_events(rcvDn, 0)
self.dequeue(rcv, 9) # depth = 5
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 1)
self.enqueue(snd, 1) # depth = 6
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 0)
self.dequeue(rcv, 1) # depth = 5
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 0)
self.dequeue(rcv, 5) # depth = 0
self.check_events(rcvUp, 0)
self.check_events(rcvDn, 0)