in client/python/polaris/catalog/rest.py [0:0]
def __init__(self, configuration) -> None:
# urllib3.PoolManager will pass all kw parameters to connectionpool
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501
# Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501
# cert_reqs
if configuration.verify_ssl:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
pool_args = {
"cert_reqs": cert_reqs,
"ca_certs": configuration.ssl_ca_cert,
"cert_file": configuration.cert_file,
"key_file": configuration.key_file,
}
if configuration.assert_hostname is not None:
pool_args['assert_hostname'] = (
configuration.assert_hostname
)
if configuration.retries is not None:
pool_args['retries'] = configuration.retries
if configuration.tls_server_name:
pool_args['server_hostname'] = configuration.tls_server_name
if configuration.socket_options is not None:
pool_args['socket_options'] = configuration.socket_options
if configuration.connection_pool_maxsize is not None:
pool_args['maxsize'] = configuration.connection_pool_maxsize
# https pool manager
self.pool_manager: urllib3.PoolManager
if configuration.proxy:
if is_socks_proxy_url(configuration.proxy):
from urllib3.contrib.socks import SOCKSProxyManager
pool_args["proxy_url"] = configuration.proxy
pool_args["headers"] = configuration.proxy_headers
self.pool_manager = SOCKSProxyManager(**pool_args)
else:
pool_args["proxy_url"] = configuration.proxy
pool_args["proxy_headers"] = configuration.proxy_headers
self.pool_manager = urllib3.ProxyManager(**pool_args)
else:
self.pool_manager = urllib3.PoolManager(**pool_args)