in qpid/testlib.py [0:0]
def connect(self, host=None, port=None):
url = self.broker
if url.scheme == URL.AMQPS:
default_port = self.DEFAULT_PORT_TLS
else:
default_port = self.DEFAULT_PORT
try:
sock = connect(host or url.host, port or url.port or default_port)
except socket.error as e:
raise Skipped(e)
if url.scheme == URL.AMQPS:
sock = ssl(sock)
conn = Connection(sock, username=url.user or self.DEFAULT_USERNAME,
password=url.password or self.DEFAULT_PASSWORD)
try:
conn.start(timeout=10)
except VersionError as e:
raise Skipped(e)
return conn