def set_secret()

in SecretsManagerRDSOracleRotationMultiUser/lambda_function.py [0:0]


def set_secret(service_client, arn, token):
    """Set the pending secret in the database

    This method tries to login to the database with the AWSPENDING secret and returns on success. If that fails, it
    tries to login with the master credentials from the masterarn in the current secret. If this succeeds, it adds all
    grants for AWSCURRENT user to the AWSPENDING user, creating the user and/or setting the password in the process.
    Else, it throws a ValueError.

    Args:
        service_client (client): The secrets manager service client

        arn (string): The secret ARN or other identifier

        token (string): The ClientRequestToken associated with the secret version

    Raises:
        ResourceNotFoundException: If the secret with the specified arn and stage does not exist

        ValueError: If the secret is not valid JSON or master credentials could not be used to login to DB

        KeyError: If the secret json does not contain the expected keys

    """
    current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
    pending_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)

    # First try to login with the pending secret, if it succeeds, return
    conn = get_connection(pending_dict)
    if conn:
        conn.close()
        logger.info("setSecret: AWSPENDING secret is already set as password in Oracle DB for secret arn %s." % arn)
        return

    # Make sure the user from current and pending match
    if get_alt_username(current_dict['username']) != pending_dict['username']:
        logger.error("setSecret: Attempting to modify user %s other than current user or clone %s" % (pending_dict['username'], current_dict['username']))
        raise ValueError("Attempting to modify user %s other than current user or clone %s" % (pending_dict['username'], current_dict['username']))

    # Make sure the user from current and pending match
    if current_dict['host'] != pending_dict['host']:
        logger.error("setSecret: Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
        raise ValueError("Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))

    # Before we do anything with the secret, make sure the AWSCURRENT secret is valid by logging in to the db
    # This ensures that the credential we are rotating is valid to protect against a confused deputy attack
    conn = get_connection(current_dict)
    if not conn:
        logger.error("setSecret: Unable to log into database using current credentials for secret %s" % arn)
        raise ValueError("Unable to log into database using current credentials for secret %s" % arn)
    conn.close()

    # Now get the master arn from the current secret
    master_arn = current_dict['masterarn']
    master_dict = get_secret_dict(service_client, master_arn, "AWSCURRENT")
    if current_dict['host'] != master_dict['host'] and not is_rds_replica_database(current_dict, master_dict):
        # If current dict is a replica of the master dict, can proceed
        logger.error("setSecret: Current database host %s is not the same host as/rds replica of master %s" % (current_dict['host'], master_dict['host']))
        raise ValueError("Current database host %s is not the same host as/rds replica of master %s" % (current_dict['host'], master_dict['host']))

    # Now log into the database with the master credentials
    conn = get_connection(master_dict)
    if not conn:
        logger.error("setSecret: Unable to log into database using credentials in master secret %s" % master_arn)
        raise ValueError("Unable to log into database using credentials in master secret %s" % master_arn)

    # Now set the password to the pending password
    cur = conn.cursor()

    # Escape username via DBMS ENQUOTE_NAME
    cur.execute("SELECT sys.DBMS_ASSERT.ENQUOTE_NAME(:username) FROM DUAL", username=pending_dict['username'])
    escaped_username = cur.fetchone()[0]

    # Escape current username via DBMS ENQUOTE_NAME
    cur.execute("SELECT sys.DBMS_ASSERT.ENQUOTE_NAME(:username) FROM DUAL", username=current_dict['username'])
    escaped_current = cur.fetchone()[0]

    # Passwords cannot have double quotes in Oracle, remove any double quotes to allow the password to be properly escaped
    pending_password = pending_dict['password'].replace("\"", "")

    # Check to see if the user already exists
    cur.execute("SELECT USERNAME FROM DBA_USERS WHERE USERNAME=:username", username=pending_dict['username'].upper())
    results = cur.fetchall()
    if len(results) > 0:
        # If user exists, just change their password
        cur.execute("ALTER USER %s IDENTIFIED BY \"%s\"" % (escaped_username, pending_password))
    else:
        # If user does not exist, create the user with appropriate grants
        cur.execute("CREATE USER %s IDENTIFIED BY \"%s\"" % (escaped_username, pending_password))
        for grant_type in ['ROLE_GRANT', 'SYSTEM_GRANT', 'OBJECT_GRANT']:
            try:
                cur.execute("SELECT DBMS_METADATA.GET_GRANTED_DDL(:grant_type, :username) FROM DUAL", grant_type=grant_type,
                            username=current_dict['username'].upper())
                results = cur.fetchall()
                for row in results:
                    sql = row[0].read().strip(' \n\t').replace("%s" % escaped_current, "%s" % escaped_username)
                    cur.execute(sql)
            except cx_Oracle.DatabaseError:
                # If we were unable to find any grants skip this type
                pass
    conn.commit()
    logger.info("setSecret: Successfully set password for %s in Oracle DB for secret arn %s." % (pending_dict['username'], arn))