def run()

in management/python/lib/qmf/console.py [0:0]


  def run(self):
    """ Main body of the running thread. """

    # First, attempt a connection.  In the unmanaged case,
    # failure to connect needs to cause the Broker()
    # constructor to raise an exception.
    delay = self.DELAY_MIN
    while not self.canceled:
      if self._tryToConnect(): # connection up
        break
      # unmanaged connection - fail & wake up constructor
      if not self.session.manageConnections:
        self.ready.release()
        return
      # managed connection - try again
      count = 0
      while not self.canceled and count < delay:
        sleep(1)
        count += 1
      if delay < self.DELAY_MAX:
        delay *= self.DELAY_FACTOR

    if self.canceled:
      self.ready.release()
      return

    # connection successful!
    self.cv.acquire()
    try:
      self.connected = True
    finally:
      self.cv.release()

    self.session._handleBrokerConnect(self)
    self.ready.release()

    while not self.canceled:

      try:
        item = self.rcv_queue.get(timeout=self.session.agent_heartbeat_min)
      except Empty:
        item = None

      while not self.canceled and item is not None:

        if not self.connected:
          # connection failure
          while item:
            # drain the queue
            try:
              item = self.rcv_queue.get(block=False)
            except Empty:
              item = None
              break

          self._disconnect()  # clean up any pending agents
          self.session._handleError(self.error)
          self.session._handleBrokerDisconnect(self)

          if not self.session.manageConnections:
            return  # do not attempt recovery

          # retry connection setup
          delay = self.DELAY_MIN
          while not self.canceled:
            if self._tryToConnect():
              break
            # managed connection - try again
            count = 0
            while not self.canceled and count < delay:
              sleep(1)
              count += 1
            if delay < self.DELAY_MAX:
              delay *= self.DELAY_FACTOR

          if self.canceled:
            return

          # connection successful!
          self.cv.acquire()
          try:
            self.connected = True
          finally:
            self.cv.release()

          self.session._handleBrokerConnect(self)

        elif item.typecode == Broker._q_item.type_v1msg:
          self._v1Dispatch(item.data)
        elif item.typecode == Broker._q_item.type_v2msg:
          self._v2Dispatch(item.data)

        try:
          item = self.rcv_queue.get(block=False)
        except Empty:
          item = None

      # queue drained, age the agents...
      if not self.canceled:
        self._ageAgents()