in SecretsManagerRDSMariaDBRotationSingleUser/lambda_function.py [0:0]
def set_secret(service_client, arn, token):
"""Set the pending secret in the database
This method tries to login to the database with the AWSPENDING secret and returns on success. If that fails, it
tries to login with the AWSCURRENT and AWSPREVIOUS secrets. If either one succeeds, it sets the AWSPENDING password
as the user password in the database. Else, it throws a ValueError.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON or valid credentials are found to login to the database
KeyError: If the secret json does not contain the expected keys
"""
try:
previous_dict = get_secret_dict(service_client, arn, "AWSPREVIOUS")
except (service_client.exceptions.ResourceNotFoundException, KeyError):
previous_dict = None
current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
pending_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)
# First try to login with the pending secret, if it succeeds, return
conn = get_connection(pending_dict)
if conn:
conn.close()
logger.info("setSecret: AWSPENDING secret is already set as password in MariaDB DB for secret arn %s." % arn)
return
# Make sure the user from current and pending match
if current_dict['username'] != pending_dict['username']:
logger.error("setSecret: Attempting to modify user %s other than current user %s" % (pending_dict['username'], current_dict['username']))
raise ValueError("Attempting to modify user %s other than current user %s" % (pending_dict['username'], current_dict['username']))
# Make sure the host from current and pending match
if current_dict['host'] != pending_dict['host']:
logger.error("setSecret: Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
raise ValueError("Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
# Now try the current password
conn = get_connection(current_dict)
# If both current and pending do not work, try previous
if not conn and previous_dict:
# Update previous_dict to leverage current SSL settings
previous_dict.pop('ssl', None)
if 'ssl' in current_dict:
previous_dict['ssl'] = current_dict['ssl']
conn = get_connection(previous_dict)
# Make sure the user and host from previous and pending match
if previous_dict['username'] != pending_dict['username']:
logger.error("setSecret: Attempting to modify user %s other than last valid user %s" % (pending_dict['username'], previous_dict['username']))
raise ValueError("Attempting to modify user %s other than last valid user %s" % (pending_dict['username'], previous_dict['username']))
if previous_dict['host'] != pending_dict['host']:
logger.error("setSecret: Attempting to modify user for host %s other than previous host %s" % (pending_dict['host'], previous_dict['host']))
raise ValueError("Attempting to modify user for host %s other than previous host %s" % (pending_dict['host'], previous_dict['host']))
# If we still don't have a connection, raise a ValueError
if not conn:
logger.error("setSecret: Unable to log into database with previous, current, or pending secret of secret arn %s" % arn)
raise ValueError("Unable to log into database with previous, current, or pending secret of secret arn %s" % arn)
# Now set the password to the pending password
try:
with conn.cursor() as cur:
cur.execute("SET PASSWORD = PASSWORD(%s)", pending_dict['password'])
conn.commit()
logger.info("setSecret: Successfully set password for user %s in MariaDB DB for secret arn %s." % (pending_dict['username'], arn))
finally:
conn.close()