in SecretsManagerActiveDirectoryRotationSingleUser/lambda_function.py [0:0]
def execute_kinit_command(current_dict, pending_dict, directory_name):
"""
Executes the kinit command to verify user credentials.
Args:
current_dict (dictionary): Dictionary containing current credentials
pending_dict (dictionary): Dictionary containing pending credentials
directory_name (string): Directory name used for kinit command
Returns:
kinit_creds_successful or raises exception
Raises:
ValueError: Raise exception if kinit fails with given credentials
"""
if pending_dict is not None:
# First try to log in with the AWSPENDING password and if that is
# successful, immediately return.
with tempfile.NamedTemporaryFile(dir="/tmp", delete=True) as cache:
username, password = check_inputs(pending_dict)
try:
proc = subprocess.Popen(
[
"./kinit",
"-c",
cache.name,
"%s@%s" % (username, directory_name.upper()),
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
encoding="utf-8",
shell=False,
)
output, error = proc.communicate(input="%s\n" % password, timeout=15)
if error is not None or proc.returncode != 0:
raise ValueError(
"kinit failed %d %s %s" % (proc.returncode, error, output)
)
return KINIT_PENDING_CREDS_SUCCESSFUL
except:
# If Pending secret does not authenticate, we can proceed to
# current secret.
logger.info(
"execute_kinit_command: Proceed to current secret since "
"pending secret "
"does not authenticate"
)
if current_dict is None:
logger.error("execute_kinit_command: Unexpected value for current_dict")
raise ValueError("execute_kinit_command: Unexpected value for current_dict")
with tempfile.NamedTemporaryFile(dir="/tmp", delete=True) as cache:
try:
username, password = check_inputs(current_dict)
proc = subprocess.Popen(
[
"./kinit",
"-c",
cache.name,
"%s@%s" % (username, directory_name.upper()),
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
encoding="utf-8",
shell=False,
)
output, error = proc.communicate(input="%s\n" % password, timeout=15)
if error is not None or proc.returncode != 0:
raise ValueError(
"kinit failed %d %s %s" % (proc.returncode, error, output)
)
return KINIT_CURRENT_CREDS_SUCCESSFUL
except Exception:
logger.error("execute_kinit_command: kinit failed")
raise ValueError("execute_kinit_command: kinit failed") from Exception