in SecretsManagerRDSOracleRotationSingleUser/lambda_function.py [0:0]
def set_secret(service_client, arn, token):
"""Set the pending secret in the database
This method tries to login to the database with the AWSPENDING secret and returns on success. If that fails, it
tries to login with the AWSCURRENT and AWSPREVIOUS secrets. If either one succeeds, it sets the AWSPENDING password
as the user password in the database. Else, it throws a ValueError.
Args:
service_client (client): The secrets manager service client
arn (string): The secret ARN or other identifier
token (string): The ClientRequestToken associated with the secret version
Raises:
ResourceNotFoundException: If the secret with the specified arn and stage does not exist
ValueError: If the secret is not valid JSON or valid credentials are found to login to the database
KeyError: If the secret json does not contain the expected keys
"""
try:
previous_dict = get_secret_dict(service_client, arn, "AWSPREVIOUS")
except (service_client.exceptions.ResourceNotFoundException, KeyError):
previous_dict = None
current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
pending_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)
# First try to login with the pending secret, if it succeeds, return
conn = get_connection(pending_dict)
if conn:
conn.close()
logger.info("setSecret: AWSPENDING secret is already set as password in Oracle DB for secret arn %s." % arn)
return
# Make sure the user from current and pending match
if current_dict['username'] != pending_dict['username']:
logger.error("setSecret: Attempting to modify user %s other than current user %s" % (pending_dict['username'], current_dict['username']))
raise ValueError("Attempting to modify user %s other than current user %s" % (pending_dict['username'], current_dict['username']))
# Make sure the host from current and pending match
if current_dict['host'] != pending_dict['host']:
logger.error("setSecret: Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
raise ValueError("Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
# Now try the current password
conn = get_connection(current_dict)
if not conn and previous_dict:
# If both current and pending do not work, try previous
conn = get_connection(previous_dict)
# Make sure the user/host from previous and pending match
if previous_dict['username'] != pending_dict['username']:
logger.error("setSecret: Attempting to modify user %s other than previous valid user %s" % (pending_dict['username'], previous_dict['username']))
raise ValueError("Attempting to modify user %s other than previous valid user %s" % (pending_dict['username'], previous_dict['username']))
if previous_dict['host'] != pending_dict['host']:
logger.error("setSecret: Attempting to modify user for host %s other than previous host %s" % (pending_dict['host'], previous_dict['host']))
raise ValueError("Attempting to modify user for host %s other than previous host %s" % (pending_dict['host'], previous_dict['host']))
# If we still don't have a connection, raise a ValueError
if not conn:
logger.error("setSecret: Unable to log into database with previous, current, or pending secret of secret arn %s" % arn)
raise ValueError("Unable to log into database with previous, current, or pending secret of secret arn %s" % arn)
cur = conn.cursor()
# Escape username via DBMS ENQUOTE_NAME
cur.execute("SELECT sys.DBMS_ASSERT.enquote_name(:username) FROM DUAL", username=pending_dict['username'])
escaped_username = cur.fetchone()[0]
# Passwords cannot have double quotes in Oracle, remove any double quotes to allow the password to be properly escaped
pending_password = pending_dict['password'].replace("\"", "")
# Now set the password to the pending password
sql = "ALTER USER %s IDENTIFIED BY \"%s\"" % (escaped_username, pending_dict['password'])
cur.execute(sql)
conn.commit()
logger.info("setSecret: Successfully set password for user %s in Oracle DB for secret arn %s." % (pending_dict['username'], arn))