def _first_message_krb()

in mysql-connector-python/lib/mysql/connector/aio/plugins/authentication_ldap_sasl_client.py [0:0]


    def _first_message_krb(self) -> Optional[bytes]:
        """Get a TGT Authentication request and initiates security context.

        This method will contact the Kerberos KDC in order of obtain a TGT.
        """
        user_name = gssapi.raw.names.import_name(
            self._username.encode("utf8"), name_type=gssapi.NameType.user
        )

        # Use defaults store = {'ccache': 'FILE:/tmp/krb5cc_1000'}#,
        #                       'keytab':'/etc/some.keytab' }
        # Attempt to retrieve credential from default cache file.
        try:
            cred: Any = gssapi.Credentials()
            logger.debug(
                "# Stored credentials found, if password was given it will be ignored."
            )
            try:
                # validate credentials has not expired.
                cred.lifetime
            except gssapi.raw.exceptions.ExpiredCredentialsError as err:
                logger.warning(" Credentials has expired: %s", err)
                cred.acquire(user_name)
                raise InterfaceError(f"Credentials has expired: {err}") from err
        except gssapi.raw.misc.GSSError as err:
            if not self._password:
                raise InterfaceError(
                    f"Unable to retrieve stored credentials error: {err}"
                ) from err
            try:
                logger.debug("# Attempt to retrieve credentials with given password")
                acquire_cred_result = gssapi.raw.acquire_cred_with_password(
                    user_name,
                    self._password.encode("utf8"),
                    usage="initiate",
                )
                cred = acquire_cred_result[0]
            except gssapi.raw.misc.GSSError as err2:
                raise ProgrammingError(
                    f"Unable to retrieve credentials with the given password: {err2}"
                ) from err

        flags_l = (
            gssapi.RequirementFlag.mutual_authentication,
            gssapi.RequirementFlag.extended_error,
            gssapi.RequirementFlag.delegate_to_peer,
        )

        if self.krb_service_principal:
            service_principal = self.krb_service_principal
        else:
            service_principal = "ldap/ldapauth"
        logger.debug("# service principal: %s", service_principal)
        servk = gssapi.Name(
            service_principal, name_type=gssapi.NameType.kerberos_principal
        )
        self.target_name = servk
        self.ctx = gssapi.SecurityContext(
            name=servk, creds=cred, flags=sum(flags_l), usage="initiate"
        )

        try:
            # step() returns bytes | None, see documentation,
            # so this method could return a NULL payload.
            # ref: https://pythongssapi.github.io/<suffix>
            # suffix: python-gssapi/latest/gssapi.html#gssapi.sec_contexts.SecurityContext
            initial_client_token = self.ctx.step()
        except gssapi.raw.misc.GSSError as err:
            raise InterfaceError(f"Unable to initiate security context: {err}") from err

        logger.debug("# initial client token: %s", initial_client_token)
        return initial_client_token