in stk-sample/lambda/stk-player-events-loader-mysql/package/mysqlx/connection.py [0:0]
def set_ssl(self, ssl_protos, ssl_mode, ssl_ca, ssl_crl, ssl_cert, ssl_key,
ssl_ciphers):
"""Set SSL parameters.
Args:
ssl_protos (list): SSL protocol to use.
ssl_mode (str): SSL mode.
ssl_ca (str): The certification authority certificate.
ssl_crl (str): The certification revocation lists.
ssl_cert (str): The certificate.
ssl_key (str): The certificate key.
ssl_ciphers (list): SSL ciphersuites to use.
Raises:
:class:`mysqlx.RuntimeError`: If Python installation has no SSL
support.
:class:`mysqlx.InterfaceError`: If the parameters are invalid.
"""
if not SSL_AVAILABLE:
self.close()
raise RuntimeError("Python installation has no SSL support")
if ssl_protos is None or not ssl_protos:
context = ssl.create_default_context()
if ssl_mode != SSLMode.VERIFY_IDENTITY:
context.check_hostname = False
if ssl_mode == SSLMode.REQUIRED:
context.verify_mode = ssl.CERT_NONE
else:
ssl_protos.sort(reverse=True)
tls_version = ssl_protos[0]
if not TLS_V1_3_SUPPORTED and \
tls_version == "TLSv1.3" and len(ssl_protos) > 1:
tls_version = ssl_protos[1]
ssl_protocol = TLS_VERSIONS[tls_version]
context = ssl.SSLContext(ssl_protocol)
if tls_version == "TLSv1.3":
if "TLSv1.2" not in ssl_protos:
context.options |= ssl.OP_NO_TLSv1_2
if "TLSv1.1" not in ssl_protos:
context.options |= ssl.OP_NO_TLSv1_1
if "TLSv1" not in ssl_protos:
context.options |= ssl.OP_NO_TLSv1
if ssl_ca:
try:
context.load_verify_locations(ssl_ca)
context.verify_mode = ssl.CERT_REQUIRED
except (IOError, ssl.SSLError) as err:
self.close()
raise InterfaceError("Invalid CA Certificate: {}".format(err))
if ssl_crl:
try:
context.load_verify_locations(ssl_crl)
context.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
except (IOError, ssl.SSLError) as err:
self.close()
raise InterfaceError("Invalid CRL: {}".format(err))
if ssl_cert:
try:
context.load_cert_chain(ssl_cert, ssl_key)
except (IOError, ssl.SSLError) as err:
self.close()
raise InterfaceError("Invalid Certificate/Key: {}".format(err))
if ssl_ciphers:
context.set_ciphers(":".join(iani_to_openssl_cs_name(ssl_protos[0],
ssl_ciphers)))
try:
self._socket = context.wrap_socket(self._socket,
server_hostname=self._host)
except ssl.CertificateError as err:
raise InterfaceError(str(err))
if ssl_mode == SSLMode.VERIFY_IDENTITY:
context.check_hostname = True
hostnames = []
# Windows does not return loopback aliases on gethostbyaddr
if os.name == 'nt' and (self._host == 'localhost' or \
self._host == '127.0.0.1'):
hostnames = ['localhost', '127.0.0.1']
aliases = socket.gethostbyaddr(self._host)
hostnames.extend([aliases[0]] + aliases[1])
match_found = False
errs = []
for hostname in hostnames:
try:
ssl.match_hostname(self._socket.getpeercert(), hostname)
except ssl.CertificateError as err:
errs.append(str(err))
else:
match_found = True
break
if not match_found:
self.close()
raise InterfaceError("Unable to verify server identity: {}"
"".format(", ".join(errs)))
self._is_ssl = True