def finish_secret()

in functions/source/rotate_sealer_key_secret/lambda_function.py [0:0]


def finish_secret(service_client, arn, token):
    """Finish the secret

    This method finalizes the rotation process by marking the secret version passed in as the AWSCURRENT secret.

    Args:
        service_client (client): The secrets manager service client

        arn (string): The secret ARN or other identifier

        token (string): The ClientRequestToken associated with the secret version

    Raises:
        ResourceNotFoundException: If the secret with the specified arn does not exist

    """
    metadata = service_client.describe_secret(SecretId=arn)
    sorted_versions = sorted(metadata["VersionIdsToStages"].items(),key=sort_by_tag)
    logger.info("finishSecret: initial versions %s" % pformat(sorted_versions))
    # First describe the secret to get the current version
    metadata = service_client.describe_secret(SecretId=arn)
    current_version = None
    for version in metadata["VersionIdsToStages"]:
        if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
            if version == token:
                # The correct version is already marked as current, return
                logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (version, arn))
                return
            current_version = version
            break

    # Finalize by staging the secret version current
    service_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version)
    logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s.  Removed AWSCURRENT from version %s." % (token, arn, current_version))
    # Bump all the previous versions as required by the configuration
    metadata = service_client.describe_secret(SecretId=arn)
    sorted_versions = sorted(metadata["VersionIdsToStages"].items(),key=sort_by_tag)
    max_versions = int(os.environ['SEALER_KEY_VERSION_COUNT'])
    while (len(sorted_versions) > max_versions):
        prune_version_id = sorted_versions[len(sorted_versions)-1][0]
        prune_version_stage = sorted_versions[len(sorted_versions)-1][1][0]
        service_client.update_secret_version_stage(SecretId=arn, VersionStage=prune_version_stage, RemoveFromVersionId=prune_version_id)
        del sorted_versions[-1]
        logger.info("finishSecret: REMOVED Stage %s from version %s" % (prune_version_stage, prune_version_id))
    for offset, version in reversed_enumerate(sorted_versions, start=1):
      version_stage = "AWSPREVIOUS-CUSTOM_"+str(offset)
      if((offset < len(sorted_versions)-1) and (offset < max_versions) and (len(sorted_versions) > offset-1) and (version_stage in sorted_versions[offset+1][1])):
        logger.info("finishSecret: Moving %s stage to version %s from version %s for secret %s." % (version_stage, sorted_versions[offset][0], sorted_versions[offset+1][0], arn))
        service_client.update_secret_version_stage(SecretId=arn, VersionStage=version_stage, MoveToVersionId=sorted_versions[offset][0], RemoveFromVersionId=sorted_versions[offset+1][0])
        logger.info("finishSecret: Successfully set %s stage to version %s for secret %s.  Removed %s from version %s." % (version_stage, sorted_versions[offset][0], arn, version_stage, sorted_versions[offset+1][0]))
      elif(offset < len(sorted_versions)):
        logger.info("finishSecret: Setting %s stage to version %s for secret %s." % (version_stage, sorted_versions[offset][0], arn))
        service_client.update_secret_version_stage(SecretId=arn, VersionStage=version_stage, MoveToVersionId=sorted_versions[offset][0])
        logger.info("finishSecret: Successfully set %s stage to version %s for secret %s." % (version_stage, sorted_versions[offset][0], arn))
    metadata = service_client.describe_secret(SecretId=arn)
    sorted_versions = sorted(metadata["VersionIdsToStages"].items(),key=sort_by_tag)
    logger.info("finishSecret: final versions %s" % pformat(sorted_versions))