def _tryToConnect()

in management/python/lib/qmf/console.py [0:0]


  def _tryToConnect(self):
    """ Connect to the broker.  Returns True if connection setup completes
    successfully, otherwise returns False and sets self.error/self.conn_exc
    with error info.  Does not raise exceptions.
    """
    self.error = None
    self.conn_exc = None
    try:
      try:
        self.cv.acquire()
        self.agents = {}
      finally:
        self.cv.release()

      self.topicBound = False
      self.syncInFlight = False
      self.syncRequest = 0
      self.syncResult = None
      self.reqsOutstanding = 1

      try:
        if self.amqpSession:
          self.amqpSession.close()
      except:
        pass
      self.amqpSession = None

      try:
        if self.conn:
          self.conn.close(5)
      except:
        pass
      self.conn = None

      sock = connect(self.host, self.port)
      sock.settimeout(5)
      oldTimeout = sock.gettimeout()
      sock.settimeout(self.connTimeout)
      connSock = None
      force_blocking = False
      if self.ssl:
        # Bug (QPID-4337): the "old" implementation of python SSL
        # fails if the socket is set to non-blocking (which settimeout()
        # may change).
        if sys.version_info[:2] < (2, 6):  # 2.6+ uses openssl - it's ok
          force_blocking = True
          sock.setblocking(1)
        certfile = None
        if 'ssl_certfile' in self.connectArgs:
          certfile = self.connectArgs['ssl_certfile']
        keyfile = None
        if 'ssl_keyfile' in self.connectArgs:
          keyfile = self.connectArgs['ssl_keyfile']
        connSock = ssl(sock, certfile=certfile, keyfile=keyfile)
      else:
        connSock = sock
      if not 'service' in self.connectArgs:
          self.connectArgs['service'] = 'qpidd'
      self.conn = Connection(connSock, username=self.authUser, password=self.authPass,
                             mechanism = self.mechanisms, host=self.host,
                             **self.connectArgs)
      def aborted():
        raise Timeout("Waiting for connection to be established with broker")
      oldAborted = self.conn.aborted
      self.conn.aborted = aborted
      self.conn.start()
      
      # Bug (QPID-4337): don't enable non-blocking (timeouts) for old SSL
      if not force_blocking:
        sock.settimeout(oldTimeout)
      self.conn.aborted = oldAborted
      uid = self.conn.user_id
      if uid.__class__ == tuple and len(uid) == 2:
        self.saslUser = uid[1]
      elif type(uid) is str:
        self.saslUser = uid;
      else:
        self.saslUser = None

      # prevent topic queues from filling up (and causing the agents to
      # disconnect) by discarding the oldest queued messages when full.
      topic_queue_options = {"qpid.policy_type":"ring"}

      self.replyName = "reply-%s" % self.amqpSessionId
      self.amqpSession = self.conn.session(self.amqpSessionId)
      self.amqpSession.timeout = self.sessTimeout
      self.amqpSession.auto_sync = True
      self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True)
      self.amqpSession.exchange_bind(exchange="amq.direct",
                                     queue=self.replyName, binding_key=self.replyName)
      self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
                                         accept_mode=self.amqpSession.accept_mode.none,
                                         acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
      self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb)
      self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=self.amqpSession.flow_mode.window)
      self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFF)
      self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.message, value=200)

      self.topicName = "topic-%s" % self.amqpSessionId
      self.amqpSession.queue_declare(queue=self.topicName, exclusive=True,
                                     auto_delete=True,
                                     arguments=topic_queue_options)
      self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
                                         accept_mode=self.amqpSession.accept_mode.none,
                                         acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
      self.amqpSession.incoming("tdest").listen(self._v1Cb, self._exceptionCb)
      self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=self.amqpSession.flow_mode.window)
      self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFF)
      self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.message, value=200)

      ##
      ## Check to see if the broker has QMFv2 exchanges configured
      ##
      direct_result = self.amqpSession.exchange_query("qmf.default.direct")
      topic_result = self.amqpSession.exchange_query("qmf.default.topic")
      self.brokerSupportsV2 = not (direct_result.not_found or topic_result.not_found)

      try:
        self.cv.acquire()
        self.agents = {}
        self.brokerAgent = Agent(self, 0, "BrokerAgent", isV2=self.brokerSupportsV2)
        self.agents['0'] = self.brokerAgent
      finally:
        self.cv.release()

      ##
      ## Set up connectivity for QMFv2
      ##
      if self.brokerSupportsV2:
        # set up 3 queues:
        # 1 direct queue - for responses destined to this console.
        # 2 topic queues - one for heartbeats (hb), one for unsolicited data
        # and event indications (ui).
        self.v2_direct_queue = "qmfc-v2-%s" % self.amqpSessionId
        self.amqpSession.queue_declare(queue=self.v2_direct_queue, exclusive=True, auto_delete=True)
        self.v2_topic_queue_ui = "qmfc-v2-ui-%s" % self.amqpSessionId
        self.amqpSession.queue_declare(queue=self.v2_topic_queue_ui,
                                       exclusive=True, auto_delete=True,
                                       arguments=topic_queue_options)
        self.v2_topic_queue_hb = "qmfc-v2-hb-%s" % self.amqpSessionId
        self.amqpSession.queue_declare(queue=self.v2_topic_queue_hb,
                                       exclusive=True, auto_delete=True,
                                       arguments=topic_queue_options)

        self.amqpSession.exchange_bind(exchange="qmf.default.direct",
                                       queue=self.v2_direct_queue, binding_key=self.v2_direct_queue)
        ## Other bindings here...

        self.amqpSession.message_subscribe(queue=self.v2_direct_queue, destination="v2dest",
                                           accept_mode=self.amqpSession.accept_mode.none,
                                           acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
        self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb)
        self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=self.amqpSession.flow_mode.window)
        self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFF)
        self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.message, value=50)

        self.amqpSession.message_subscribe(queue=self.v2_topic_queue_ui, destination="v2TopicUI",
                                           accept_mode=self.amqpSession.accept_mode.none,
                                           acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
        self.amqpSession.incoming("v2TopicUI").listen(self._v2Cb, self._exceptionCb)
        self.amqpSession.message_set_flow_mode(destination="v2TopicUI", flow_mode=self.amqpSession.flow_mode.window)
        self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFF)
        self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.message, value=25)


        self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hb, destination="v2TopicHB",
                                           accept_mode=self.amqpSession.accept_mode.none,
                                           acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
        self.amqpSession.incoming("v2TopicHB").listen(self._v2Cb, self._exceptionCb)
        self.amqpSession.message_set_flow_mode(destination="v2TopicHB", flow_mode=self.amqpSession.flow_mode.window)
        self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFF)
        self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.message, value=100)

      codec = Codec()
      self._setHeader(codec, 'B')
      msg = self._message(codec.encoded)
      self._send(msg)

      return True  # connection complete

    except Exception as e:
      self.error = "Exception during connection setup: %s - %s" % (e.__class__.__name__, e)
      self.conn_exc = e
      if self.session.console:
        self.session.console.brokerConnectionFailed(self)
      return False     # connection failed