mysqlx-connector-python/lib/mysqlx/utils.py (88 lines of code) (raw):

# Copyright (c) 2009, 2024, Oracle and/or its affiliates. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License, version 2.0, as # published by the Free Software Foundation. # # This program is designed to work with certain software (including # but not limited to OpenSSL) that is licensed under separate terms, # as designated in a particular file or component or in included license # documentation. The authors of MySQL hereby grant you an # additional permission to link the program and your derivative works # with the separately licensed software that they have either included with # the program or referenced in the documentation. # # Without limiting anything contained in the foregoing, this file, # which is part of MySQL Connector/Python, is also subject to the # Universal FOSS Exception, version 1.0, a copy of which can be found at # http://oss.oracle.com/licenses/universal-foss-exception. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. # See the GNU General Public License, version 2.0, for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """Utilities.""" import os import subprocess import sys import warnings from typing import Dict, Optional, Tuple from .tls_ciphers import DEPRECATED_TLS_CIPHERSUITES, DEPRECATED_TLS_VERSIONS def _parse_os_release() -> Dict[str, str]: """Parse the contents of /etc/os-release file. Returns: A dictionary containing release information. """ distro: Dict[str, str] = {} os_release_file = os.path.join("/etc", "os-release") if not os.path.exists(os_release_file): return distro with open(os_release_file, encoding="utf-8") as file_obj: for line in file_obj: key_value = line.split("=") if len(key_value) != 2: continue key = key_value[0].lower() value = key_value[1].rstrip("\n").strip('"') distro[key] = value return distro def _parse_lsb_release() -> Dict[str, str]: """Parse the contents of /etc/lsb-release file. Returns: A dictionary containing release information. """ distro = {} lsb_release_file = os.path.join("/etc", "lsb-release") if os.path.exists(lsb_release_file): with open(lsb_release_file, encoding="utf-8") as file_obj: for line in file_obj: key_value = line.split("=") if len(key_value) != 2: continue key = key_value[0].lower() value = key_value[1].rstrip("\n").strip('"') distro[key] = value return distro def _parse_lsb_release_command() -> Optional[Dict[str, str]]: """Parse the output of the lsb_release command. Returns: A dictionary containing release information. """ distro = {} with open(os.devnull, "w", encoding="utf-8") as devnull: try: stdout = subprocess.check_output(("lsb_release", "-a"), stderr=devnull) except OSError: return None lines = stdout.decode(sys.getfilesystemencoding()).splitlines() for line in lines: key_value = line.split(":") if len(key_value) != 2: continue key = key_value[0].replace(" ", "_").lower() value = key_value[1].strip("\t") distro[key] = value return distro def linux_distribution() -> Tuple[str, str, str]: """Tries to determine the name of the Linux OS distribution name. First tries to get information from ``/etc/os-release`` file. If fails, tries to get the information of ``/etc/lsb-release`` file. And finally the information of ``lsb-release`` command. Returns: A tuple with (`name`, `version`, `codename`) """ distro: Optional[Dict[str, str]] = _parse_lsb_release() if distro: return ( distro.get("distrib_id", ""), distro.get("distrib_release", ""), distro.get("distrib_codename", ""), ) distro = _parse_lsb_release_command() if distro: return ( distro.get("distributor_id", ""), distro.get("release", ""), distro.get("codename", ""), ) distro = _parse_os_release() if distro: return ( distro.get("name", ""), distro.get("version_id", ""), distro.get("version_codename", ""), ) return ("", "", "") def warn_ciphersuites_deprecated(cipher_as_ossl: str, tls_version: str) -> None: """Emits a warning if a deprecated cipher is being utilized. Args: cipher: Must be ingested as OpenSSL name. tls_versions: TLS version to check the cipher against. Raises: DeprecationWarning: If the cipher is flagged as deprecated according to the OSSA cipher list. """ if cipher_as_ossl in DEPRECATED_TLS_CIPHERSUITES.get(tls_version, {}).values(): warn_msg = ( f"This connection is using TLS cipher {cipher_as_ossl} which is now " "deprecated and will be removed in a future release of " "MySQL Connector/Python." ) warnings.warn(warn_msg, DeprecationWarning) def warn_tls_version_deprecated(tls_version: str) -> None: """Emits a warning if a deprecated TLS version is being utilized. Args: tls_versions: TLS version to check. Raises: DeprecationWarning: If the TLS version is flagged as deprecated according to the OSSA cipher list. """ if tls_version in DEPRECATED_TLS_VERSIONS: warn_msg = ( f"This connection is using TLS version {tls_version} which is now " "deprecated and will be removed in a future release of " "MySQL Connector/Python." ) warnings.warn(warn_msg, DeprecationWarning)