in SecretsManagerRDSSQLServerRotationSingleUser/lambda_function.py [0:0]
def set_secret(service_client, arn, token):
"""Set the pending secret in the database
This method tries to login to the database with the AWSPENDING secret and returns on success. If that fails, it
tries to login with the AWSCURRENT and AWSPREVIOUS secrets. If either one succeeds, it sets the AWSPENDING password
as the user password in the database. Else, it throws a ValueError.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON or valid credentials are found to login to the database
KeyError: If the secret json does not contain the expected keys
"""
try:
previous_dict = get_secret_dict(service_client, arn, "AWSPREVIOUS")
except (service_client.exceptions.ResourceNotFoundException, KeyError):
previous_dict = None
current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
pending_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)
# First try to login with the pending secret, if it succeeds, return
conn = get_connection(pending_dict)
if conn:
conn.close()
logger.info("setSecret: AWSPENDING secret is already set as password in SQL Server DB for secret arn %s." % arn)
return
# Make sure the user from current and pending match
if current_dict['username'] != pending_dict['username']:
logger.error("setSecret: Attempting to modify user %s other than current user %s" % (pending_dict['username'], current_dict['username']))
raise ValueError("Attempting to modify user %s other than current user %s" % (pending_dict['username'], current_dict['username']))
# Make sure the host from current and pending match
if current_dict['host'] != pending_dict['host']:
logger.error("setSecret: Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
raise ValueError("Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
# Now try the current password
conn = get_connection(current_dict)
# If both current and pending do not work, try previous
if not conn and previous_dict:
# Update previous_dict to leverage current SSL settings
previous_dict.pop('ssl', None)
if 'ssl' in current_dict:
previous_dict['ssl'] = current_dict['ssl']
conn = get_connection(previous_dict)
# Make sure the user/host from previous and pending match
if previous_dict['username'] != pending_dict['username']:
logger.error("setSecret: Attempting to modify user %s other than previous valid user %s" % (pending_dict['username'], previous_dict['username']))
raise ValueError("Attempting to modify user %s other than previous valid user %s" % (pending_dict['username'], previous_dict['username']))
if previous_dict['host'] != pending_dict['host']:
logger.error("setSecret: Attempting to modify user for host %s other than previous host %s" % (pending_dict['host'], previous_dict['host']))
raise ValueError("Attempting to modify user for host %s other than previous host %s" % (pending_dict['host'], previous_dict['host']))
# If we still don't have a connection, raise a ValueError
if not conn:
logger.error("setSecret: Unable to log into database with previous, current, or pending secret of secret arn %s" % arn)
raise ValueError("Unable to log into database with previous, current, or pending secret of secret arn %s" % arn)
# Now set the password to the pending password
try:
with conn.cursor() as cursor:
# Get escaped username via QUOTENAME
cursor.execute("SELECT QUOTENAME(%s) AS QUOTENAME", (current_dict['username'],))
escaped_username = cursor.fetchone()['QUOTENAME']
# Get the current version and db
cursor.execute("SELECT @@VERSION AS version")
version = cursor.fetchall()[0]['version']
cursor.execute("SELECT DB_NAME() AS name")
current_db = cursor.fetchall()[0]['name']
# Determine if we are in a contained DB
containment = 0
if not version.startswith("Microsoft SQL Server 2008"): # SQL Server 2008 does not support contained databases
cursor.execute("SELECT containment FROM sys.databases WHERE name = %s", current_db)
containment = cursor.fetchall()[0]['containment']
# Set the user or login password (depending on database containment)
if containment == 0:
alter_stmt = "ALTER LOGIN %s" % escaped_username
cursor.execute(alter_stmt + " WITH PASSWORD = %s OLD_PASSWORD = %s", (pending_dict['password'], current_dict['password']))
else:
alter_stmt = "ALTER USER %s" % escaped_username
cursor.execute(alter_stmt + " WITH PASSWORD = %s OLD_PASSWORD = %s", (pending_dict['password'], current_dict['password']))
conn.commit()
logger.info("setSecret: Successfully set password for user %s in SQL Server DB for secret arn %s." % (pending_dict['username'], arn))
finally:
conn.close()