in aliyun-python-sdk-core/aliyunsdkcore/vendored/requests/packages/urllib3/packages/socks.py [0:0]
def create_connection(dest_pair,
timeout=None, source_address=None,
proxy_type=None, proxy_addr=None,
proxy_port=None, proxy_rdns=True,
proxy_username=None, proxy_password=None,
socket_options=None):
"""create_connection(dest_pair, *[, timeout], **proxy_args) -> socket object
Like socket.create_connection(), but connects to proxy
before returning the socket object.
dest_pair - 2-tuple of (IP/hostname, port).
**proxy_args - Same args passed to socksocket.set_proxy() if present.
timeout - Optional socket timeout value, in seconds.
source_address - tuple (host, port) for the socket to bind to as its source
address before connecting (only for compatibility)
"""
# Remove IPv6 brackets on the remote address and proxy address.
remote_host, remote_port = dest_pair
if remote_host.startswith("["):
remote_host = remote_host.strip("[]")
if proxy_addr and proxy_addr.startswith("["):
proxy_addr = proxy_addr.strip("[]")
err = None
# Allow the SOCKS proxy to be on IPv4 or IPv6 addresses.
for r in socket.getaddrinfo(proxy_addr, proxy_port, 0, socket.SOCK_STREAM):
family, socket_type, proto, canonname, sa = r
sock = None
try:
sock = socksocket(family, socket_type, proto)
if socket_options:
for opt in socket_options:
sock.setsockopt(*opt)
if isinstance(timeout, (int, float)):
sock.settimeout(timeout)
if proxy_type:
sock.set_proxy(proxy_type, proxy_addr, proxy_port, proxy_rdns,
proxy_username, proxy_password)
if source_address:
sock.bind(source_address)
sock.connect((remote_host, remote_port))
return sock
except (socket.error, ProxyConnectionError) as e:
err = e
if sock:
sock.close()
sock = None
if err:
raise err
raise socket.error("gai returned empty list.")