in SecretsManagerRDSMySQLRotationMultiUser/lambda_function.py [0:0]
def set_secret(service_client, arn, token):
"""Set the pending secret in the database
This method tries to login to the database with the AWSPENDING secret and returns on success. If that fails, it
tries to login with the master credentials from the masterarn in the current secret. If this succeeds, it adds all
grants for AWSCURRENT user to the AWSPENDING user, creating the user and/or setting the password in the process.
Else, it throws a ValueError.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON or master credentials could not be used to login to DB
KeyError: If the secret json does not contain the expected keys
"""
current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
pending_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)
# First try to login with the pending secret, if it succeeds, return
conn = get_connection(pending_dict)
if conn:
conn.close()
logger.info("setSecret: AWSPENDING secret is already set as password in MySQL DB for secret arn %s." % arn)
return
# Make sure the user from current and pending match
if get_alt_username(current_dict['username']) != pending_dict['username']:
logger.error("setSecret: Attempting to modify user %s other than current user or clone %s" % (pending_dict['username'], current_dict['username']))
raise ValueError("Attempting to modify user %s other than current user or clone %s" % (pending_dict['username'], current_dict['username']))
# Make sure the host from current and pending match
if current_dict['host'] != pending_dict['host']:
logger.error("setSecret: Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
raise ValueError("Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
# Before we do anything with the secret, make sure the AWSCURRENT secret is valid by logging in to the db
# This ensures that the credential we are rotating is valid to protect against a confused deputy attack
conn = get_connection(current_dict)
if not conn:
logger.error("setSecret: Unable to log into database using current credentials for secret %s" % arn)
raise ValueError("Unable to log into database using current credentials for secret %s" % arn)
conn.close()
# Now get the master arn from the current secret
master_arn = current_dict['masterarn']
master_dict = get_secret_dict(service_client, master_arn, "AWSCURRENT")
if current_dict['host'] != master_dict['host'] and not is_rds_replica_database(current_dict, master_dict):
# If current dict is a replica of the master dict, can proceed
logger.error("setSecret: Current database host %s is not the same host as/rds replica of master %s" % (current_dict['host'], master_dict['host']))
raise ValueError("Current database host %s is not the same host as/rds replica of master %s" % (current_dict['host'], master_dict['host']))
# Now log into the database with the master credentials
conn = get_connection(master_dict)
if not conn:
logger.error("setSecret: Unable to log into database using credentials in master secret %s" % master_arn)
raise ValueError("Unable to log into database using credentials in master secret %s" % master_arn)
# Now set the password to the pending password
try:
with conn.cursor() as cur:
cur.execute("SELECT User FROM mysql.user WHERE User = %s", pending_dict['username'])
# Create the user if it does not exist
if cur.rowcount == 0:
cur.execute("CREATE USER %s IDENTIFIED BY %s", (pending_dict['username'], pending_dict['password']))
# Copy grants to the new user
cur.execute("SHOW GRANTS FOR %s", current_dict['username'])
for row in cur.fetchall():
grant = row[0].split(' TO ')
new_grant_escaped = grant[0].replace('%', '%%') # % is a special character in Python format strings.
cur.execute(new_grant_escaped + " TO %s", (pending_dict['username'],))
# Get the version of MySQL
cur.execute("SELECT VERSION()")
ver = cur.fetchone()[0]
# Copy TLS options to the new user
escaped_encryption_statement = get_escaped_encryption_statement(ver)
cur.execute("SELECT ssl_type, ssl_cipher, x509_issuer, x509_subject FROM mysql.user WHERE User = %s", current_dict['username'])
tls_options = cur.fetchone()
ssl_type = tls_options[0]
if not ssl_type:
cur.execute(escaped_encryption_statement + " NONE", pending_dict['username'])
elif ssl_type == "ANY":
cur.execute(escaped_encryption_statement + " SSL", pending_dict['username'])
elif ssl_type == "X509":
cur.execute(escaped_encryption_statement + " X509", pending_dict['username'])
else:
cur.execute(escaped_encryption_statement + " CIPHER %s AND ISSUER %s AND SUBJECT %s", (pending_dict['username'], tls_options[1], tls_options[2], tls_options[3]))
# Set the password for the user and commit
password_option = get_password_option(ver)
cur.execute("SET PASSWORD FOR %s = " + password_option, (pending_dict['username'], pending_dict['password']))
conn.commit()
logger.info("setSecret: Successfully set password for %s in MySQL DB for secret arn %s." % (pending_dict['username'], arn))
finally:
conn.close()