def set_secret()

in SecretsManagerRDSSQLServerRotationSingleUser/lambda_function.py [0:0]


def set_secret(service_client, arn, token):
    """Set the pending secret in the database

    This method tries to login to the database with the AWSPENDING secret and returns on success. If that fails, it
    tries to login with the AWSCURRENT and AWSPREVIOUS secrets. If either one succeeds, it sets the AWSPENDING password
    as the user password in the database. Else, it throws a ValueError.

    Args:
        service_client (client): The secrets manager service client

        arn (string): The secret ARN or other identifier

        token (string): The ClientRequestToken associated with the secret version

    Raises:
        ResourceNotFoundException: If the secret with the specified arn and stage does not exist

        ValueError: If the secret is not valid JSON or valid credentials are found to login to the database

        KeyError: If the secret json does not contain the expected keys

    """
    try:
        previous_dict = get_secret_dict(service_client, arn, "AWSPREVIOUS")
    except (service_client.exceptions.ResourceNotFoundException, KeyError):
        previous_dict = None
    current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
    pending_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)

    # First try to login with the pending secret, if it succeeds, return
    conn = get_connection(pending_dict)
    if conn:
        conn.close()
        logger.info("setSecret: AWSPENDING secret is already set as password in SQL Server DB for secret arn %s." % arn)
        return

    # Make sure the user from current and pending match
    if current_dict['username'] != pending_dict['username']:
        logger.error("setSecret: Attempting to modify user %s other than current user %s" % (pending_dict['username'], current_dict['username']))
        raise ValueError("Attempting to modify user %s other than current user %s" % (pending_dict['username'], current_dict['username']))

    # Make sure the host from current and pending match
    if current_dict['host'] != pending_dict['host']:
        logger.error("setSecret: Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
        raise ValueError("Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))

    # Now try the current password
    conn = get_connection(current_dict)

    # If both current and pending do not work, try previous
    if not conn and previous_dict:
        # Update previous_dict to leverage current SSL settings
        previous_dict.pop('ssl', None)
        if 'ssl' in current_dict:
            previous_dict['ssl'] = current_dict['ssl']

        conn = get_connection(previous_dict)

        # Make sure the user/host from previous and pending match
        if previous_dict['username'] != pending_dict['username']:
            logger.error("setSecret: Attempting to modify user %s other than previous valid user %s" % (pending_dict['username'], previous_dict['username']))
            raise ValueError("Attempting to modify user %s other than previous valid user %s" % (pending_dict['username'], previous_dict['username']))
        if previous_dict['host'] != pending_dict['host']:
            logger.error("setSecret: Attempting to modify user for host %s other than previous host %s" % (pending_dict['host'], previous_dict['host']))
            raise ValueError("Attempting to modify user for host %s other than previous host %s" % (pending_dict['host'], previous_dict['host']))

    # If we still don't have a connection, raise a ValueError
    if not conn:
        logger.error("setSecret: Unable to log into database with previous, current, or pending secret of secret arn %s" % arn)
        raise ValueError("Unable to log into database with previous, current, or pending secret of secret arn %s" % arn)

    # Now set the password to the pending password
    try:
        with conn.cursor() as cursor:
            # Get escaped username via QUOTENAME
            cursor.execute("SELECT QUOTENAME(%s) AS QUOTENAME", (current_dict['username'],))
            escaped_username = cursor.fetchone()['QUOTENAME']

            # Get the current version and db
            cursor.execute("SELECT @@VERSION AS version")
            version = cursor.fetchall()[0]['version']
            cursor.execute("SELECT DB_NAME() AS name")
            current_db = cursor.fetchall()[0]['name']

            # Determine if we are in a contained DB
            containment = 0
            if not version.startswith("Microsoft SQL Server 2008"):  # SQL Server 2008 does not support contained databases
                cursor.execute("SELECT containment FROM sys.databases WHERE name = %s", current_db)
                containment = cursor.fetchall()[0]['containment']

            # Set the user or login password (depending on database containment)
            if containment == 0:
                alter_stmt = "ALTER LOGIN %s" % escaped_username
                cursor.execute(alter_stmt + " WITH PASSWORD = %s OLD_PASSWORD = %s", (pending_dict['password'], current_dict['password']))
            else:
                alter_stmt = "ALTER USER %s" % escaped_username
                cursor.execute(alter_stmt + " WITH PASSWORD = %s OLD_PASSWORD = %s", (pending_dict['password'], current_dict['password']))

            conn.commit()
            logger.info("setSecret: Successfully set password for user %s in SQL Server DB for secret arn %s." % (pending_dict['username'], arn))
    finally:
        conn.close()