def switch_to_ssl()

in stk-sample/lambda/stk-player-events-loader-mysql/package/mysql/connector/network.py [0:0]


    def switch_to_ssl(self, ca, cert, key, verify_cert=False,
                      verify_identity=False, cipher_suites=None,
                      tls_versions=None):
        """Switch the socket to use SSL"""
        if not self.sock:
            raise errors.InterfaceError(errno=2048)

        try:
            if verify_cert:
                cert_reqs = ssl.CERT_REQUIRED
            elif verify_identity:
                cert_reqs = ssl.CERT_OPTIONAL
            else:
                cert_reqs = ssl.CERT_NONE

            if tls_versions is None or not tls_versions:
                context = ssl.create_default_context()
                if not verify_identity:
                    context.check_hostname = False
                context.options
            else:
                tls_versions.sort(reverse=True)

                tls_version = tls_versions[0]
                if not TLS_V1_3_SUPPORTED and \
                   tls_version == "TLSv1.3" and len(tls_versions) > 1:
                    tls_version = tls_versions[1]
                ssl_protocol = TLS_VERSIONS[tls_version]
                context = ssl.SSLContext(ssl_protocol)

                if tls_version == "TLSv1.3":
                    if "TLSv1.2" not in tls_versions:
                        context.options |= ssl.OP_NO_TLSv1_2
                    if "TLSv1.1" not in tls_versions:
                        context.options |= ssl.OP_NO_TLSv1_1
                    if "TLSv1" not in tls_versions:
                        context.options |= ssl.OP_NO_TLSv1

            context.check_hostname = False
            context.verify_mode = cert_reqs
            context.load_default_certs()

            if ca:
                try:
                    context.load_verify_locations(ca)
                except (IOError, ssl.SSLError) as err:
                    self.sock.close()
                    raise InterfaceError(
                        "Invalid CA Certificate: {}".format(err))
            if cert:
                try:
                    context.load_cert_chain(cert, key)
                except (IOError, ssl.SSLError) as err:
                    self.sock.close()
                    raise InterfaceError(
                        "Invalid Certificate/Key: {}".format(err))
            if cipher_suites:
                context.set_ciphers(cipher_suites)

            if hasattr(self, "server_host"):
                self.sock = context.wrap_socket(
                    self.sock, server_hostname=self.server_host)
            else:
                self.sock = context.wrap_socket(self.sock)

            if verify_identity:
                context.check_hostname = True
                hostnames = [self.server_host]
                if os.name == 'nt' and self.server_host == 'localhost':
                    hostnames = ['localhost', '127.0.0.1']
                    aliases = socket.gethostbyaddr(self.server_host)
                    hostnames.extend([aliases[0]] + aliases[1])
                match_found = False
                errs = []
                for hostname in hostnames:
                    try:
                        ssl.match_hostname(self.sock.getpeercert(), hostname)
                    except ssl.CertificateError as err:
                        errs.append(str(err))
                    else:
                        match_found = True
                        break
                if not match_found:
                    self.sock.close()
                    raise InterfaceError("Unable to verify server identity: {}"
                                         "".format(", ".join(errs)))
        except NameError:
            raise errors.NotSupportedError(
                "Python installation has no SSL support")
        except (ssl.SSLError, IOError) as err:
            raise errors.InterfaceError(
                errno=2055, values=(self.get_address(), _strioerror(err)))
        except ssl.CertificateError as err:
            raise errors.InterfaceError(str(err))
        except NotImplementedError as err:
            raise errors.InterfaceError(str(err))