in SecretsManagerRDSPostgreSQLRotationMultiUser/lambda_function.py [0:0]
def set_secret(service_client, arn, token):
"""Set the pending secret in the database
This method tries to login to the database with the AWSPENDING secret and returns on success. If that fails, it
tries to login with the master credentials from the masterarn in the current secret. If this succeeds, it adds all
grants for AWSCURRENT user to the AWSPENDING user, creating the user and/or setting the password in the process.
Else, it throws a ValueError.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON or master credentials could not be used to login to DB
KeyError: If the secret json does not contain the expected keys
"""
# First try to login with the pending secret, if it succeeds, return
current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
pending_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)
conn = get_connection(pending_dict)
if conn:
conn.close()
logger.info("setSecret: AWSPENDING secret is already set as password in PostgreSQL DB for secret arn %s." % arn)
return
# Make sure the user from current and pending match
if get_alt_username(current_dict['username']) != pending_dict['username']:
logger.error(
"setSecret: Attempting to modify user %s other than current user or clone %s" % (pending_dict['username'], current_dict['username']))
raise ValueError(
"Attempting to modify user %s other than current user or clone %s" % (pending_dict['username'], current_dict['username']))
# Make sure the host from current and pending match
if current_dict['host'] != pending_dict['host']:
logger.error("setSecret: Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
raise ValueError("Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
# Before we do anything with the secret, make sure the AWSCURRENT secret is valid by logging in to the db
# This ensures that the credential we are rotating is valid to protect against a confused deputy attack
conn = get_connection(current_dict)
if not conn:
logger.error("setSecret: Unable to log into database using current credentials for secret %s" % arn)
raise ValueError("Unable to log into database using current credentials for secret %s" % arn)
conn.close()
# Now get the master arn from the current secret
master_arn = current_dict['masterarn']
master_dict = get_secret_dict(service_client, master_arn, "AWSCURRENT")
if current_dict['host'] != master_dict['host'] and not is_rds_replica_database(current_dict, master_dict):
# If current dict is a replica of the master dict, can proceed
logger.error("setSecret: Current database host %s is not the same host as/rds replica of master %s" % (current_dict['host'], master_dict['host']))
raise ValueError("Current database host %s is not the same host as/rds replica of master %s" % (current_dict['host'], master_dict['host']))
# Now log into the database with the master credentials
conn = get_connection(master_dict)
if not conn:
logger.error("setSecret: Unable to log into database using credentials in master secret %s" % master_arn)
raise ValueError("Unable to log into database using credentials in master secret %s" % master_arn)
# Now set the password to the pending password
try:
with conn.cursor() as cur:
# Get escaped usernames via quote_ident
cur.execute("SELECT quote_ident(%s)", (pending_dict['username'],))
pending_username = cur.fetchone()[0]
cur.execute("SELECT quote_ident(%s)", (current_dict['username'],))
current_username = cur.fetchone()[0]
# Check if the user exists, if not create it and grant it all permissions from the current role
# If the user exists, just update the password
cur.execute("SELECT 1 FROM pg_roles where rolname = %s", (pending_dict['username'],))
if len(cur.fetchall()) == 0:
create_role = "CREATE ROLE %s" % pending_username
cur.execute(create_role + " WITH LOGIN PASSWORD %s", (pending_dict['password'],))
cur.execute("GRANT %s TO %s" % (current_username, pending_username))
else:
alter_role = "ALTER USER %s" % pending_username
cur.execute(alter_role + " WITH PASSWORD %s", (pending_dict['password'],))
conn.commit()
logger.info("setSecret: Successfully set password for %s in PostgreSQL DB for secret arn %s." % (pending_dict['username'], arn))
finally:
conn.close()