def set_ssl()

in stk-sample/lambda/stk-player-events-loader-mysql/package/mysqlx/connection.py [0:0]


    def set_ssl(self, ssl_protos, ssl_mode, ssl_ca, ssl_crl, ssl_cert, ssl_key,
                ssl_ciphers):
        """Set SSL parameters.

        Args:
            ssl_protos (list): SSL protocol to use.
            ssl_mode (str): SSL mode.
            ssl_ca (str): The certification authority certificate.
            ssl_crl (str): The certification revocation lists.
            ssl_cert (str): The certificate.
            ssl_key (str): The certificate key.
            ssl_ciphers (list): SSL ciphersuites to use.

        Raises:
            :class:`mysqlx.RuntimeError`: If Python installation has no SSL
                                          support.
            :class:`mysqlx.InterfaceError`: If the parameters are invalid.
        """
        if not SSL_AVAILABLE:
            self.close()
            raise RuntimeError("Python installation has no SSL support")

        if ssl_protos is None or not ssl_protos:
            context = ssl.create_default_context()
            if ssl_mode != SSLMode.VERIFY_IDENTITY:
                context.check_hostname = False
            if ssl_mode == SSLMode.REQUIRED:
                context.verify_mode = ssl.CERT_NONE
        else:
            ssl_protos.sort(reverse=True)
            tls_version = ssl_protos[0]
            if not TLS_V1_3_SUPPORTED and \
               tls_version == "TLSv1.3" and len(ssl_protos) > 1:
                tls_version = ssl_protos[1]
            ssl_protocol = TLS_VERSIONS[tls_version]
            context = ssl.SSLContext(ssl_protocol)

            if tls_version == "TLSv1.3":
                if "TLSv1.2" not in ssl_protos:
                    context.options |= ssl.OP_NO_TLSv1_2
                if "TLSv1.1" not in ssl_protos:
                    context.options |= ssl.OP_NO_TLSv1_1
                if "TLSv1" not in ssl_protos:
                    context.options |= ssl.OP_NO_TLSv1

        if ssl_ca:
            try:
                context.load_verify_locations(ssl_ca)
                context.verify_mode = ssl.CERT_REQUIRED
            except (IOError, ssl.SSLError) as err:
                self.close()
                raise InterfaceError("Invalid CA Certificate: {}".format(err))

        if ssl_crl:
            try:
                context.load_verify_locations(ssl_crl)
                context.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
            except (IOError, ssl.SSLError) as err:
                self.close()
                raise InterfaceError("Invalid CRL: {}".format(err))

        if ssl_cert:
            try:
                context.load_cert_chain(ssl_cert, ssl_key)
            except (IOError, ssl.SSLError) as err:
                self.close()
                raise InterfaceError("Invalid Certificate/Key: {}".format(err))

        if ssl_ciphers:
            context.set_ciphers(":".join(iani_to_openssl_cs_name(ssl_protos[0],
                                                                 ssl_ciphers)))
        try:
            self._socket = context.wrap_socket(self._socket,
                                               server_hostname=self._host)
        except ssl.CertificateError as err:
            raise InterfaceError(str(err))
        if ssl_mode == SSLMode.VERIFY_IDENTITY:
            context.check_hostname = True
            hostnames = []
            # Windows does not return loopback aliases on gethostbyaddr
            if os.name == 'nt' and (self._host == 'localhost' or \
               self._host == '127.0.0.1'):
                hostnames = ['localhost', '127.0.0.1']
            aliases = socket.gethostbyaddr(self._host)
            hostnames.extend([aliases[0]] + aliases[1])
            match_found = False
            errs = []
            for hostname in hostnames:
                try:
                    ssl.match_hostname(self._socket.getpeercert(), hostname)
                except ssl.CertificateError as err:
                    errs.append(str(err))
                else:
                    match_found = True
                    break
            if not match_found:
                self.close()
                raise InterfaceError("Unable to verify server identity: {}"
                                     "".format(", ".join(errs)))

        self._is_ssl = True