in functions/source/rotate_sealer_key_secret/lambda_function.py [0:0]
def finish_secret(service_client, arn, token):
"""Finish the secret
This method finalizes the rotation process by marking the secret version passed in as the AWSCURRENT secret.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn does not exist
"""
metadata = service_client.describe_secret(SecretId=arn)
sorted_versions = sorted(metadata["VersionIdsToStages"].items(),key=sort_by_tag)
logger.info("finishSecret: initial versions %s" % pformat(sorted_versions))
# First describe the secret to get the current version
metadata = service_client.describe_secret(SecretId=arn)
current_version = None
for version in metadata["VersionIdsToStages"]:
if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
if version == token:
# The correct version is already marked as current, return
logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (version, arn))
return
current_version = version
break
# Finalize by staging the secret version current
service_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version)
logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s. Removed AWSCURRENT from version %s." % (token, arn, current_version))
# Bump all the previous versions as required by the configuration
metadata = service_client.describe_secret(SecretId=arn)
sorted_versions = sorted(metadata["VersionIdsToStages"].items(),key=sort_by_tag)
max_versions = int(os.environ['SEALER_KEY_VERSION_COUNT'])
while (len(sorted_versions) > max_versions):
prune_version_id = sorted_versions[len(sorted_versions)-1][0]
prune_version_stage = sorted_versions[len(sorted_versions)-1][1][0]
service_client.update_secret_version_stage(SecretId=arn, VersionStage=prune_version_stage, RemoveFromVersionId=prune_version_id)
del sorted_versions[-1]
logger.info("finishSecret: REMOVED Stage %s from version %s" % (prune_version_stage, prune_version_id))
for offset, version in reversed_enumerate(sorted_versions, start=1):
version_stage = "AWSPREVIOUS-CUSTOM_"+str(offset)
if((offset < len(sorted_versions)-1) and (offset < max_versions) and (len(sorted_versions) > offset-1) and (version_stage in sorted_versions[offset+1][1])):
logger.info("finishSecret: Moving %s stage to version %s from version %s for secret %s." % (version_stage, sorted_versions[offset][0], sorted_versions[offset+1][0], arn))
service_client.update_secret_version_stage(SecretId=arn, VersionStage=version_stage, MoveToVersionId=sorted_versions[offset][0], RemoveFromVersionId=sorted_versions[offset+1][0])
logger.info("finishSecret: Successfully set %s stage to version %s for secret %s. Removed %s from version %s." % (version_stage, sorted_versions[offset][0], arn, version_stage, sorted_versions[offset+1][0]))
elif(offset < len(sorted_versions)):
logger.info("finishSecret: Setting %s stage to version %s for secret %s." % (version_stage, sorted_versions[offset][0], arn))
service_client.update_secret_version_stage(SecretId=arn, VersionStage=version_stage, MoveToVersionId=sorted_versions[offset][0])
logger.info("finishSecret: Successfully set %s stage to version %s for secret %s." % (version_stage, sorted_versions[offset][0], arn))
metadata = service_client.describe_secret(SecretId=arn)
sorted_versions = sorted(metadata["VersionIdsToStages"].items(),key=sort_by_tag)
logger.info("finishSecret: final versions %s" % pformat(sorted_versions))