mysql-connector-python/lib/mysql/connector/aio/plugins/authentication_openid_connect_client.py [42:139]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
AUTHENTICATION_PLUGIN_CLASS = "MySQLOpenIDConnectAuthPlugin"
OPENID_TOKEN_MAX_SIZE = 10 * 1024  # In bytes


class MySQLOpenIDConnectAuthPlugin(MySQLAuthPlugin):
    """Class implementing the MySQL OpenID Connect Authentication Plugin."""

    _openid_capability_flag: bytes = utils.int1store(1)

    @property
    def name(self) -> str:
        """Plugin official name."""
        return "authentication_openid_connect_client"

    @property
    def requires_ssl(self) -> bool:
        """Signals whether or not SSL is required."""
        return True

    @staticmethod
    def _validate_openid_token(token: str) -> bool:
        """Helper method used to validate OpenID Connect token

        The Token is represented as a JSON Web Token (JWT) consists of a
        base64-encoded header, body, and signature, separated by '.' e.g.,
        "Base64url.Base64url.Base64url". The First part of the token contains
        the header, the second part contains payload and the third part contains
        signature. These token parts should be Base64 URLSafe i.e., Token cannot
        contain characters other than a-z, A-Z, 0-9 and special characters '-', '_'.

        Args:
            token (str): Base64url-encoded OpenID connect token fetched from
                         the file path passed via `openid_token_file` connection
                         argument.

        Returns:
            bool: Signal indicating whether the token is valid or not.
        """
        header_payload_sig: List[str] = token.split(".")
        if len(header_payload_sig) != 3:
            # invalid structure
            return False
        urlsafe_pattern = re.compile("^[a-zA-Z0-9-_]*$")
        return all(
            (
                len(token_part) and urlsafe_pattern.search(token_part) is not None
                for token_part in header_payload_sig
            )
        )

    def auth_response(self, auth_data: bytes, **kwargs: Any) -> bytes:
        """Prepares authentication string for the server.
        Args:
            auth_data: Authorization data.
            kwargs: Custom configuration to be passed to the auth plugin
                    when invoked.

        Returns:
            packet: Client's authorization response.
            The OpenID Connect authorization response follows the pattern :-
            int<1>           capability flag
            string<lenenc>   id token

        Raises:
            InterfaceError: If the connection is insecure or the OpenID Token is too large,
                            invalid or non-existent.
            ProgrammingError: If the OpenID Token file could not be read.
        """
        try:
            # Check if the connection is secure
            if self.requires_ssl and not self._ssl_enabled:
                raise errors.InterfaceError(f"{self.name} requires SSL")

            # Validate the file
            token_file_path: str = kwargs.get("openid_token_file", None)
            openid_token_file: Path = Path(token_file_path)
            # Check if token exceeds the maximum size
            if openid_token_file.stat().st_size > OPENID_TOKEN_MAX_SIZE:
                raise errors.InterfaceError(
                    "The OpenID Connect token file size is too large (> 10KB)"
                )
            openid_token: str = openid_token_file.read_text(encoding="utf-8")
            openid_token = openid_token.strip()
            # Validate the JWT Token
            if not self._validate_openid_token(openid_token):
                raise errors.InterfaceError("The OpenID Connect Token is invalid")

            # build the auth_response packet
            auth_response: List[bytes] = [
                self._openid_capability_flag,
                utils.lc_int(len(openid_token)),
                openid_token.encode(),
            ]
            return b"".join(auth_response)
        except (SyntaxError, TypeError, OSError, UnicodeError) as err:
            raise errors.ProgrammingError(
                "The OpenID Connect Token File (openid_token_file) could not be read"
            ) from err
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



mysql-connector-python/lib/mysql/connector/plugins/authentication_openid_connect_client.py [43:140]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
AUTHENTICATION_PLUGIN_CLASS = "MySQLOpenIDConnectAuthPlugin"
OPENID_TOKEN_MAX_SIZE = 10 * 1024  # In bytes


class MySQLOpenIDConnectAuthPlugin(MySQLAuthPlugin):
    """Class implementing the MySQL OpenID Connect Authentication Plugin."""

    _openid_capability_flag: bytes = utils.int1store(1)

    @property
    def name(self) -> str:
        """Plugin official name."""
        return "authentication_openid_connect_client"

    @property
    def requires_ssl(self) -> bool:
        """Signals whether or not SSL is required."""
        return True

    @staticmethod
    def _validate_openid_token(token: str) -> bool:
        """Helper method used to validate OpenID Connect token

        The Token is represented as a JSON Web Token (JWT) consists of a
        base64-encoded header, body, and signature, separated by '.' e.g.,
        "Base64url.Base64url.Base64url". The First part of the token contains
        the header, the second part contains payload and the third part contains
        signature. These token parts should be Base64 URLSafe i.e., Token cannot
        contain characters other than a-z, A-Z, 0-9 and special characters '-', '_'.

        Args:
            token (str): Base64url-encoded OpenID connect token fetched from
                         the file path passed via `openid_token_file` connection
                         argument.

        Returns:
            bool: Signal indicating whether the token is valid or not.
        """
        header_payload_sig: List[str] = token.split(".")
        if len(header_payload_sig) != 3:
            # invalid structure
            return False
        urlsafe_pattern = re.compile("^[a-zA-Z0-9-_]*$")
        return all(
            (
                len(token_part) and urlsafe_pattern.search(token_part) is not None
                for token_part in header_payload_sig
            )
        )

    def auth_response(self, auth_data: bytes, **kwargs: Any) -> bytes:
        """Prepares authentication string for the server.
        Args:
            auth_data: Authorization data.
            kwargs: Custom configuration to be passed to the auth plugin
                    when invoked.

        Returns:
            packet: Client's authorization response.
            The OpenID Connect authorization response follows the pattern :-
            int<1>           capability flag
            string<lenenc>   id token

        Raises:
            InterfaceError: If the connection is insecure or the OpenID Token is too large,
                            invalid or non-existent.
            ProgrammingError: If the OpenID Token file could not be read.
        """
        try:
            # Check if the connection is secure
            if self.requires_ssl and not self._ssl_enabled:
                raise errors.InterfaceError(f"{self.name} requires SSL")

            # Validate the file
            token_file_path: str = kwargs.get("openid_token_file", None)
            openid_token_file: Path = Path(token_file_path)
            # Check if token exceeds the maximum size
            if openid_token_file.stat().st_size > OPENID_TOKEN_MAX_SIZE:
                raise errors.InterfaceError(
                    "The OpenID Connect token file size is too large (> 10KB)"
                )
            openid_token: str = openid_token_file.read_text(encoding="utf-8")
            openid_token = openid_token.strip()
            # Validate the JWT Token
            if not self._validate_openid_token(openid_token):
                raise errors.InterfaceError("The OpenID Connect Token is invalid")

            # build the auth_response packet
            auth_response: List[bytes] = [
                self._openid_capability_flag,
                utils.lc_int(len(openid_token)),
                openid_token.encode(),
            ]
            return b"".join(auth_response)
        except (SyntaxError, TypeError, OSError, UnicodeError) as err:
            raise errors.ProgrammingError(
                "The OpenID Connect Token File (openid_token_file) could not be read"
            ) from err
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



