def execute_kinit_command()

in SecretsManagerActiveDirectoryRotationSingleUser/lambda_function.py [0:0]


def execute_kinit_command(current_dict, pending_dict, directory_name):
    """
    Executes the kinit command to verify user credentials.
    Args:
        current_dict (dictionary): Dictionary containing current credentials
        pending_dict (dictionary): Dictionary containing pending credentials
        directory_name (string): Directory name used for kinit command
    Returns:
        kinit_creds_successful or raises exception
    Raises:
        ValueError: Raise exception if kinit fails with given credentials
    """

    if pending_dict is not None:
        # First try to log in with the AWSPENDING password and if that is
        # successful, immediately return.
        with tempfile.NamedTemporaryFile(dir="/tmp", delete=True) as cache:
            username, password = check_inputs(pending_dict)
            try:
                proc = subprocess.Popen(
                    [
                        "./kinit",
                        "-c",
                        cache.name,
                        "%s@%s" % (username, directory_name.upper()),
                    ],
                    stdin=subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    encoding="utf-8",
                    shell=False,
                )
                output, error = proc.communicate(input="%s\n" % password, timeout=15)
                if error is not None or proc.returncode != 0:
                    raise ValueError(
                        "kinit failed %d %s %s" % (proc.returncode, error, output)
                    )
                return KINIT_PENDING_CREDS_SUCCESSFUL
            except:
                # If Pending secret does not authenticate, we can proceed to
                # current secret.
                logger.info(
                    "execute_kinit_command: Proceed to current secret since "
                    "pending secret "
                    "does not authenticate"
                )

    if current_dict is None:
        logger.error("execute_kinit_command: Unexpected value for current_dict")
        raise ValueError("execute_kinit_command: Unexpected value for current_dict")

    with tempfile.NamedTemporaryFile(dir="/tmp", delete=True) as cache:
        try:
            username, password = check_inputs(current_dict)
            proc = subprocess.Popen(
                [
                    "./kinit",
                    "-c",
                    cache.name,
                    "%s@%s" % (username, directory_name.upper()),
                ],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                encoding="utf-8",
                shell=False,
            )
            output, error = proc.communicate(input="%s\n" % password, timeout=15)
            if error is not None or proc.returncode != 0:
                raise ValueError(
                    "kinit failed %d %s %s" % (proc.returncode, error, output)
                )
            return KINIT_CURRENT_CREDS_SUCCESSFUL
        except Exception:
            logger.error("execute_kinit_command: kinit failed")
            raise ValueError("execute_kinit_command: kinit failed") from Exception