in stk-sample/lambda/stk-player-events-loader-mysql/package/mysql/connector/network.py [0:0]
def switch_to_ssl(self, ca, cert, key, verify_cert=False,
verify_identity=False, cipher_suites=None,
tls_versions=None):
"""Switch the socket to use SSL"""
if not self.sock:
raise errors.InterfaceError(errno=2048)
try:
if verify_cert:
cert_reqs = ssl.CERT_REQUIRED
elif verify_identity:
cert_reqs = ssl.CERT_OPTIONAL
else:
cert_reqs = ssl.CERT_NONE
if tls_versions is None or not tls_versions:
context = ssl.create_default_context()
if not verify_identity:
context.check_hostname = False
context.options
else:
tls_versions.sort(reverse=True)
tls_version = tls_versions[0]
if not TLS_V1_3_SUPPORTED and \
tls_version == "TLSv1.3" and len(tls_versions) > 1:
tls_version = tls_versions[1]
ssl_protocol = TLS_VERSIONS[tls_version]
context = ssl.SSLContext(ssl_protocol)
if tls_version == "TLSv1.3":
if "TLSv1.2" not in tls_versions:
context.options |= ssl.OP_NO_TLSv1_2
if "TLSv1.1" not in tls_versions:
context.options |= ssl.OP_NO_TLSv1_1
if "TLSv1" not in tls_versions:
context.options |= ssl.OP_NO_TLSv1
context.check_hostname = False
context.verify_mode = cert_reqs
context.load_default_certs()
if ca:
try:
context.load_verify_locations(ca)
except (IOError, ssl.SSLError) as err:
self.sock.close()
raise InterfaceError(
"Invalid CA Certificate: {}".format(err))
if cert:
try:
context.load_cert_chain(cert, key)
except (IOError, ssl.SSLError) as err:
self.sock.close()
raise InterfaceError(
"Invalid Certificate/Key: {}".format(err))
if cipher_suites:
context.set_ciphers(cipher_suites)
if hasattr(self, "server_host"):
self.sock = context.wrap_socket(
self.sock, server_hostname=self.server_host)
else:
self.sock = context.wrap_socket(self.sock)
if verify_identity:
context.check_hostname = True
hostnames = [self.server_host]
if os.name == 'nt' and self.server_host == 'localhost':
hostnames = ['localhost', '127.0.0.1']
aliases = socket.gethostbyaddr(self.server_host)
hostnames.extend([aliases[0]] + aliases[1])
match_found = False
errs = []
for hostname in hostnames:
try:
ssl.match_hostname(self.sock.getpeercert(), hostname)
except ssl.CertificateError as err:
errs.append(str(err))
else:
match_found = True
break
if not match_found:
self.sock.close()
raise InterfaceError("Unable to verify server identity: {}"
"".format(", ".join(errs)))
except NameError:
raise errors.NotSupportedError(
"Python installation has no SSL support")
except (ssl.SSLError, IOError) as err:
raise errors.InterfaceError(
errno=2055, values=(self.get_address(), _strioerror(err)))
except ssl.CertificateError as err:
raise errors.InterfaceError(str(err))
except NotImplementedError as err:
raise errors.InterfaceError(str(err))