in management/python/lib/qmf/console.py [0:0]
def _tryToConnect(self):
""" Connect to the broker. Returns True if connection setup completes
successfully, otherwise returns False and sets self.error/self.conn_exc
with error info. Does not raise exceptions.
"""
self.error = None
self.conn_exc = None
try:
try:
self.cv.acquire()
self.agents = {}
finally:
self.cv.release()
self.topicBound = False
self.syncInFlight = False
self.syncRequest = 0
self.syncResult = None
self.reqsOutstanding = 1
try:
if self.amqpSession:
self.amqpSession.close()
except:
pass
self.amqpSession = None
try:
if self.conn:
self.conn.close(5)
except:
pass
self.conn = None
sock = connect(self.host, self.port)
sock.settimeout(5)
oldTimeout = sock.gettimeout()
sock.settimeout(self.connTimeout)
connSock = None
force_blocking = False
if self.ssl:
# Bug (QPID-4337): the "old" implementation of python SSL
# fails if the socket is set to non-blocking (which settimeout()
# may change).
if sys.version_info[:2] < (2, 6): # 2.6+ uses openssl - it's ok
force_blocking = True
sock.setblocking(1)
certfile = None
if 'ssl_certfile' in self.connectArgs:
certfile = self.connectArgs['ssl_certfile']
keyfile = None
if 'ssl_keyfile' in self.connectArgs:
keyfile = self.connectArgs['ssl_keyfile']
connSock = ssl(sock, certfile=certfile, keyfile=keyfile)
else:
connSock = sock
if not 'service' in self.connectArgs:
self.connectArgs['service'] = 'qpidd'
self.conn = Connection(connSock, username=self.authUser, password=self.authPass,
mechanism = self.mechanisms, host=self.host,
**self.connectArgs)
def aborted():
raise Timeout("Waiting for connection to be established with broker")
oldAborted = self.conn.aborted
self.conn.aborted = aborted
self.conn.start()
# Bug (QPID-4337): don't enable non-blocking (timeouts) for old SSL
if not force_blocking:
sock.settimeout(oldTimeout)
self.conn.aborted = oldAborted
uid = self.conn.user_id
if uid.__class__ == tuple and len(uid) == 2:
self.saslUser = uid[1]
elif type(uid) is str:
self.saslUser = uid;
else:
self.saslUser = None
# prevent topic queues from filling up (and causing the agents to
# disconnect) by discarding the oldest queued messages when full.
topic_queue_options = {"qpid.policy_type":"ring"}
self.replyName = "reply-%s" % self.amqpSessionId
self.amqpSession = self.conn.session(self.amqpSessionId)
self.amqpSession.timeout = self.sessTimeout
self.amqpSession.auto_sync = True
self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True)
self.amqpSession.exchange_bind(exchange="amq.direct",
queue=self.replyName, binding_key=self.replyName)
self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=self.amqpSession.flow_mode.window)
self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFF)
self.amqpSession.message_flow(destination="rdest", unit=self.amqpSession.credit_unit.message, value=200)
self.topicName = "topic-%s" % self.amqpSessionId
self.amqpSession.queue_declare(queue=self.topicName, exclusive=True,
auto_delete=True,
arguments=topic_queue_options)
self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("tdest").listen(self._v1Cb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=self.amqpSession.flow_mode.window)
self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFF)
self.amqpSession.message_flow(destination="tdest", unit=self.amqpSession.credit_unit.message, value=200)
##
## Check to see if the broker has QMFv2 exchanges configured
##
direct_result = self.amqpSession.exchange_query("qmf.default.direct")
topic_result = self.amqpSession.exchange_query("qmf.default.topic")
self.brokerSupportsV2 = not (direct_result.not_found or topic_result.not_found)
try:
self.cv.acquire()
self.agents = {}
self.brokerAgent = Agent(self, 0, "BrokerAgent", isV2=self.brokerSupportsV2)
self.agents['0'] = self.brokerAgent
finally:
self.cv.release()
##
## Set up connectivity for QMFv2
##
if self.brokerSupportsV2:
# set up 3 queues:
# 1 direct queue - for responses destined to this console.
# 2 topic queues - one for heartbeats (hb), one for unsolicited data
# and event indications (ui).
self.v2_direct_queue = "qmfc-v2-%s" % self.amqpSessionId
self.amqpSession.queue_declare(queue=self.v2_direct_queue, exclusive=True, auto_delete=True)
self.v2_topic_queue_ui = "qmfc-v2-ui-%s" % self.amqpSessionId
self.amqpSession.queue_declare(queue=self.v2_topic_queue_ui,
exclusive=True, auto_delete=True,
arguments=topic_queue_options)
self.v2_topic_queue_hb = "qmfc-v2-hb-%s" % self.amqpSessionId
self.amqpSession.queue_declare(queue=self.v2_topic_queue_hb,
exclusive=True, auto_delete=True,
arguments=topic_queue_options)
self.amqpSession.exchange_bind(exchange="qmf.default.direct",
queue=self.v2_direct_queue, binding_key=self.v2_direct_queue)
## Other bindings here...
self.amqpSession.message_subscribe(queue=self.v2_direct_queue, destination="v2dest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=self.amqpSession.flow_mode.window)
self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFF)
self.amqpSession.message_flow(destination="v2dest", unit=self.amqpSession.credit_unit.message, value=50)
self.amqpSession.message_subscribe(queue=self.v2_topic_queue_ui, destination="v2TopicUI",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("v2TopicUI").listen(self._v2Cb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="v2TopicUI", flow_mode=self.amqpSession.flow_mode.window)
self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFF)
self.amqpSession.message_flow(destination="v2TopicUI", unit=self.amqpSession.credit_unit.message, value=25)
self.amqpSession.message_subscribe(queue=self.v2_topic_queue_hb, destination="v2TopicHB",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("v2TopicHB").listen(self._v2Cb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="v2TopicHB", flow_mode=self.amqpSession.flow_mode.window)
self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.byte, value=0xFFFFFFFF)
self.amqpSession.message_flow(destination="v2TopicHB", unit=self.amqpSession.credit_unit.message, value=100)
codec = Codec()
self._setHeader(codec, 'B')
msg = self._message(codec.encoded)
self._send(msg)
return True # connection complete
except Exception as e:
self.error = "Exception during connection setup: %s - %s" % (e.__class__.__name__, e)
self.conn_exc = e
if self.session.console:
self.session.console.brokerConnectionFailed(self)
return False # connection failed