in management/python/lib/qmf/console.py [0:0]
def run(self):
""" Main body of the running thread. """
# First, attempt a connection. In the unmanaged case,
# failure to connect needs to cause the Broker()
# constructor to raise an exception.
delay = self.DELAY_MIN
while not self.canceled:
if self._tryToConnect(): # connection up
break
# unmanaged connection - fail & wake up constructor
if not self.session.manageConnections:
self.ready.release()
return
# managed connection - try again
count = 0
while not self.canceled and count < delay:
sleep(1)
count += 1
if delay < self.DELAY_MAX:
delay *= self.DELAY_FACTOR
if self.canceled:
self.ready.release()
return
# connection successful!
self.cv.acquire()
try:
self.connected = True
finally:
self.cv.release()
self.session._handleBrokerConnect(self)
self.ready.release()
while not self.canceled:
try:
item = self.rcv_queue.get(timeout=self.session.agent_heartbeat_min)
except Empty:
item = None
while not self.canceled and item is not None:
if not self.connected:
# connection failure
while item:
# drain the queue
try:
item = self.rcv_queue.get(block=False)
except Empty:
item = None
break
self._disconnect() # clean up any pending agents
self.session._handleError(self.error)
self.session._handleBrokerDisconnect(self)
if not self.session.manageConnections:
return # do not attempt recovery
# retry connection setup
delay = self.DELAY_MIN
while not self.canceled:
if self._tryToConnect():
break
# managed connection - try again
count = 0
while not self.canceled and count < delay:
sleep(1)
count += 1
if delay < self.DELAY_MAX:
delay *= self.DELAY_FACTOR
if self.canceled:
return
# connection successful!
self.cv.acquire()
try:
self.connected = True
finally:
self.cv.release()
self.session._handleBrokerConnect(self)
elif item.typecode == Broker._q_item.type_v1msg:
self._v1Dispatch(item.data)
elif item.typecode == Broker._q_item.type_v2msg:
self._v2Dispatch(item.data)
try:
item = self.rcv_queue.get(block=False)
except Empty:
item = None
# queue drained, age the agents...
if not self.canceled:
self._ageAgents()