in SecretsManagerActiveDirectoryRotationSingleUser/lambda_function.py [0:0]
def lambda_handler(event, context):
"""
Rotates a password for a Directory Services user account. This is the
main lambda entry point.
Args:
event (dict): Lambda dictionary of event parameters. These keys must
include the following:
- SecretId: The secret ARN or identifier
- ClientRequestToken: The ClientRequestToken of the secret version
- Step: The rotation step (one of createSecret, setSecret,
testSecret, or finishSecret)
context (LambdaContext): The Lambda runtime information
Raises:
ResourceNotFoundException: If the secret with the specified arn and
stage does not exist
ValueError: If the secret is not properly configured for rotation
KeyError: If the event parameters do not contain the expected keys
Exceptions from ds.describe_directories :
DirectoryService.Client.exceptions.EntityDoesNotExistException
DirectoryService.Client.exceptions.InvalidParameterException
DirectoryService.Client.exceptions.InvalidNextTokenException
DirectoryService.Client.exceptions.ClientException
DirectoryService.Client.exceptions.ServiceException
"""
arn = event["SecretId"]
token = event["ClientRequestToken"]
step = event["Step"]
# To use only the packaged kerberos libraries.
os.environ["LD_LIBRARY_PATH"] = "./:$LD_LIBRARY_PATH"
# Setup the clients
secrets_manager_client = boto3.client(
"secretsmanager", endpoint_url=os.environ["SECRETS_MANAGER_ENDPOINT"]
)
directory_services_client = boto3.client("ds")
# Make sure the version is staged correctly
metadata = secrets_manager_client.describe_secret(SecretId=arn)
if "RotationEnabled" in metadata and not metadata["RotationEnabled"]:
logger.error("Secret %s is not enabled for rotation" % arn)
raise ValueError("Secret %s is not enabled for rotation" % arn)
current_dict = get_secret_dict(secrets_manager_client, arn, "AWSCURRENT")
directory_name_list = [current_dict[DICT_KEY_DIRECTORY]]
directory_info = directory_services_client.describe_directories(
DirectoryIds=directory_name_list, Limit=1
)
directory_description = directory_info["DirectoryDescriptions"][0]
directory_name = directory_description["Name"]
versions = metadata["VersionIdsToStages"]
if token not in versions:
logger.error(
"Secret version %s has no stage for rotation of secret %s." % (token, arn)
)
raise ValueError(
"Secret version %s has no stage for rotation of secret %s." % (token, arn)
)
if "AWSCURRENT" in versions[token]:
logger.info(
"Secret version %s already set as AWSCURRENT for secret %s." % (token, arn)
)
return
elif "AWSPENDING" not in versions[token]:
logger.error(
"Secret version %s not set as AWSPENDING for rotation of secret %s."
% (token, arn)
)
raise ValueError(
"Secret version %s not set as AWSPENDING for rotation of secret %s."
% (token, arn)
)
# Call the appropriate step
if step == "createSecret":
create_secret(secrets_manager_client, arn, token, directory_name, current_dict)
elif step == "setSecret":
# Get the pending secret and update password in Directory Services
pending_dict = get_secret_dict(secrets_manager_client, arn, "AWSPENDING", token)
if current_dict[DICT_KEY_USERNAME] != pending_dict[DICT_KEY_USERNAME]:
logger.error(
"Username %s in current dict does not match username %s in "
"pending dict"
% (current_dict[DICT_KEY_USERNAME], pending_dict[DICT_KEY_USERNAME])
)
raise ValueError(
"Username %s in current dict does not match username %s in "
"pending dict"
% (current_dict[DICT_KEY_USERNAME], pending_dict[DICT_KEY_USERNAME])
)
pending_directory_name_list = [pending_dict[DICT_KEY_DIRECTORY]]
if pending_directory_name_list != directory_name_list:
logger.error(
"Current directory name list %s does not match pending "
"directory name list %s"
% (directory_name_list, pending_directory_name_list)
)
raise ValueError(
"Current directory name list %s does not match pending "
"directory name list %s"
% (directory_name_list, pending_directory_name_list)
)
set_secret(
directory_services_client,
directory_name,
current_dict,
pending_dict,
)
elif step == "testSecret":
pending_dict = get_secret_dict(secrets_manager_client, arn, "AWSPENDING", token)
test_secret(directory_name, pending_dict)
elif step == "finishSecret":
finish_secret(secrets_manager_client, arn, token)
else:
logger.error(
"lambda_handler: Invalid step parameter %s for secret %s" % (step, arn)
)
raise ValueError("Invalid step parameter %s for secret %s" % (step, arn))