in mysql-connector-python/lib/mysql/connector/plugins/authentication_ldap_sasl_client.py [0:0]
def _first_message_krb(self) -> Optional[bytes]:
"""Get a TGT Authentication request and initiates security context.
This method will contact the Kerberos KDC in order of obtain a TGT.
"""
user_name = gssapi.raw.names.import_name(
self._username.encode("utf8"), name_type=gssapi.NameType.user
)
# Use defaults store = {'ccache': 'FILE:/tmp/krb5cc_1000'}#,
# 'keytab':'/etc/some.keytab' }
# Attempt to retrieve credential from default cache file.
try:
cred: Any = gssapi.Credentials()
logger.debug(
"# Stored credentials found, if password was given it will be ignored."
)
try:
# validate credentials has not expired.
cred.lifetime
except gssapi.raw.exceptions.ExpiredCredentialsError as err:
logger.warning(" Credentials has expired: %s", err)
cred.acquire(user_name)
raise InterfaceError(f"Credentials has expired: {err}") from err
except gssapi.raw.misc.GSSError as err:
if not self._password:
raise InterfaceError(
f"Unable to retrieve stored credentials error: {err}"
) from err
try:
logger.debug("# Attempt to retrieve credentials with given password")
acquire_cred_result = gssapi.raw.acquire_cred_with_password(
user_name,
self._password.encode("utf8"),
usage="initiate",
)
cred = acquire_cred_result[0]
except gssapi.raw.misc.GSSError as err2:
raise ProgrammingError(
f"Unable to retrieve credentials with the given password: {err2}"
) from err
flags_l = (
gssapi.RequirementFlag.mutual_authentication,
gssapi.RequirementFlag.extended_error,
gssapi.RequirementFlag.delegate_to_peer,
)
if self.krb_service_principal:
service_principal = self.krb_service_principal
else:
service_principal = "ldap/ldapauth"
logger.debug("# service principal: %s", service_principal)
servk = gssapi.Name(
service_principal, name_type=gssapi.NameType.kerberos_principal
)
self.target_name = servk
self.ctx = gssapi.SecurityContext(
name=servk, creds=cred, flags=sum(flags_l), usage="initiate"
)
try:
# step() returns bytes | None, see documentation,
# so this method could return a NULL payload.
# ref: https://pythongssapi.github.io/<suffix>
# suffix: python-gssapi/latest/gssapi.html#gssapi.sec_contexts.SecurityContext
initial_client_token = self.ctx.step()
except gssapi.raw.misc.GSSError as err:
raise InterfaceError(f"Unable to initiate security context: {err}") from err
logger.debug("# initial client token: %s", initial_client_token)
return initial_client_token