in qpid_tests/broker_0_10/management.py [0:0]
def test_methods_async (self):
"""
"""
class Handler (qmf.console.Console):
def __init__(self):
self.cv = Condition()
self.xmtList = {}
self.rcvList = {}
def methodResponse(self, broker, seq, response):
self.cv.acquire()
try:
self.rcvList[seq] = response
finally:
self.cv.release()
def request(self, broker, count):
self.count = count
for idx in range(count):
self.cv.acquire()
try:
seq = broker.echo(idx, "Echo Message", _async = True)
self.xmtList[seq] = idx
finally:
self.cv.release()
def check(self):
if self.count != len(self.xmtList):
return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList))
lost = 0
mismatched = 0
for seq in self.xmtList:
value = self.xmtList[seq]
if seq in self.rcvList:
result = self.rcvList.pop(seq)
if result.sequence != value:
mismatched += 1
else:
lost += 1
spurious = len(self.rcvList)
if lost == 0 and mismatched == 0 and spurious == 0:
return "pass"
else:
return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
handler = Handler()
self.startQmf(handler)
brokers = self.qmf.getObjects(_class="broker")
self.assertEqual(len(brokers), 1)
broker = brokers[0]
handler.request(broker, 20)
sleep(1)
self.assertEqual(handler.check(), "pass")