def test_methods_async()

in qpid_tests/broker_0_10/management.py [0:0]


    def test_methods_async (self):
        """
        """
        class Handler (qmf.console.Console):
            def __init__(self):
                self.cv = Condition()
                self.xmtList = {}
                self.rcvList = {}

            def methodResponse(self, broker, seq, response):
                self.cv.acquire()
                try:
                    self.rcvList[seq] = response
                finally:
                    self.cv.release()

            def request(self, broker, count):
                self.count = count
                for idx in range(count):
                    self.cv.acquire()
                    try:
                        seq = broker.echo(idx, "Echo Message", _async = True)
                        self.xmtList[seq] = idx
                    finally:
                        self.cv.release()

            def check(self):
                if self.count != len(self.xmtList):
                    return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList))
                lost = 0
                mismatched = 0
                for seq in self.xmtList:
                    value = self.xmtList[seq]
                    if seq in self.rcvList:
                        result = self.rcvList.pop(seq)
                        if result.sequence != value:
                            mismatched += 1
                    else:
                        lost += 1
                spurious = len(self.rcvList)
                if lost == 0 and mismatched == 0 and spurious == 0:
                    return "pass"
                else:
                    return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)

        handler = Handler()
        self.startQmf(handler)
        brokers = self.qmf.getObjects(_class="broker")
        self.assertEqual(len(brokers), 1)
        broker = brokers[0]
        handler.request(broker, 20)
        sleep(1)
        self.assertEqual(handler.check(), "pass")