def lambda_handler()

in SecretsManagerActiveDirectoryRotationSingleUser/lambda_function.py [0:0]


def lambda_handler(event, context):
    """
    Rotates a password for a Directory Services user account. This is the
    main lambda entry point.
    Args:
        event (dict): Lambda dictionary of event parameters. These keys must
        include the following:
            - SecretId: The secret ARN or identifier
            - ClientRequestToken: The ClientRequestToken of the secret version
            - Step: The rotation step (one of createSecret, setSecret,
            testSecret, or finishSecret)
        context (LambdaContext): The Lambda runtime information
    Raises:
        ResourceNotFoundException: If the secret with the specified arn and
        stage does not exist
        ValueError: If the secret is not properly configured for rotation
        KeyError: If the event parameters do not contain the expected keys
        Exceptions from ds.describe_directories :
            DirectoryService.Client.exceptions.EntityDoesNotExistException
            DirectoryService.Client.exceptions.InvalidParameterException
            DirectoryService.Client.exceptions.InvalidNextTokenException
            DirectoryService.Client.exceptions.ClientException
            DirectoryService.Client.exceptions.ServiceException
    """
    arn = event["SecretId"]
    token = event["ClientRequestToken"]
    step = event["Step"]

    # To use only the packaged kerberos libraries.
    os.environ["LD_LIBRARY_PATH"] = "./:$LD_LIBRARY_PATH"

    # Setup the clients
    secrets_manager_client = boto3.client(
        "secretsmanager", endpoint_url=os.environ["SECRETS_MANAGER_ENDPOINT"]
    )
    directory_services_client = boto3.client("ds")

    # Make sure the version is staged correctly
    metadata = secrets_manager_client.describe_secret(SecretId=arn)
    if "RotationEnabled" in metadata and not metadata["RotationEnabled"]:
        logger.error("Secret %s is not enabled for rotation" % arn)
        raise ValueError("Secret %s is not enabled for rotation" % arn)

    current_dict = get_secret_dict(secrets_manager_client, arn, "AWSCURRENT")
    directory_name_list = [current_dict[DICT_KEY_DIRECTORY]]
    directory_info = directory_services_client.describe_directories(
        DirectoryIds=directory_name_list, Limit=1
    )
    directory_description = directory_info["DirectoryDescriptions"][0]
    directory_name = directory_description["Name"]

    versions = metadata["VersionIdsToStages"]
    if token not in versions:
        logger.error(
            "Secret version %s has no stage for rotation of secret %s." % (token, arn)
        )
        raise ValueError(
            "Secret version %s has no stage for rotation of secret %s." % (token, arn)
        )
    if "AWSCURRENT" in versions[token]:
        logger.info(
            "Secret version %s already set as AWSCURRENT for secret %s." % (token, arn)
        )
        return
    elif "AWSPENDING" not in versions[token]:
        logger.error(
            "Secret version %s not set as AWSPENDING for rotation of secret %s."
            % (token, arn)
        )
        raise ValueError(
            "Secret version %s not set as AWSPENDING for rotation of secret %s."
            % (token, arn)
        )

    # Call the appropriate step
    if step == "createSecret":
        create_secret(secrets_manager_client, arn, token, directory_name, current_dict)
    elif step == "setSecret":
        # Get the pending secret and update password in Directory Services
        pending_dict = get_secret_dict(secrets_manager_client, arn, "AWSPENDING", token)
        if current_dict[DICT_KEY_USERNAME] != pending_dict[DICT_KEY_USERNAME]:
            logger.error(
                "Username %s in current dict does not match username %s in "
                "pending dict"
                % (current_dict[DICT_KEY_USERNAME], pending_dict[DICT_KEY_USERNAME])
            )
            raise ValueError(
                "Username %s in current dict does not match username %s in "
                "pending dict"
                % (current_dict[DICT_KEY_USERNAME], pending_dict[DICT_KEY_USERNAME])
            )
        pending_directory_name_list = [pending_dict[DICT_KEY_DIRECTORY]]
        if pending_directory_name_list != directory_name_list:
            logger.error(
                "Current directory name list %s does not match pending "
                "directory name list %s"
                % (directory_name_list, pending_directory_name_list)
            )
            raise ValueError(
                "Current directory name list %s does not match pending "
                "directory name list %s"
                % (directory_name_list, pending_directory_name_list)
            )
        set_secret(
            directory_services_client,
            directory_name,
            current_dict,
            pending_dict,
        )
    elif step == "testSecret":
        pending_dict = get_secret_dict(secrets_manager_client, arn, "AWSPENDING", token)
        test_secret(directory_name, pending_dict)
    elif step == "finishSecret":
        finish_secret(secrets_manager_client, arn, token)
    else:
        logger.error(
            "lambda_handler: Invalid step parameter %s for secret %s" % (step, arn)
        )
        raise ValueError("Invalid step parameter %s for secret %s" % (step, arn))