def set_secret()

in SecretsManagerRedshiftRotationMultiUser/lambda_function.py [0:0]


def set_secret(service_client, arn, token):
    """Set the pending secret in the database

    This method tries to login to the database with the AWSPENDING secret and returns on success. If that fails, it
    tries to login with the master credentials from the masterarn in the current secret. If this succeeds, it adds all
    grants for AWSCURRENT user to the AWSPENDING user, creating the user and/or setting the password in the process.
    Else, it throws a ValueError.

    Args:
        service_client (client): The secrets manager service client

        arn (string): The secret ARN or other identifier

        token (string): The ClientRequestToken associated with the secret version

    Raises:
        ResourceNotFoundException: If the secret with the specified arn and stage does not exist

        ValueError: If the secret is not valid JSON or master credentials could not be used to login to DB

        KeyError: If the secret json does not contain the expected keys

    """
    current_dict = get_secret_dict(service_client, arn, "AWSCURRENT")
    pending_dict = get_secret_dict(service_client, arn, "AWSPENDING", token)

    # First try to login with the pending secret, if it succeeds, return
    conn = get_connection(pending_dict)
    if conn:
        conn.close()
        logger.info("setSecret: AWSPENDING secret is already set as password in Redshift DB for secret arn %s." % arn)
        return

    # Make sure the user from current and pending match
    if get_alt_username(current_dict['username']) != pending_dict['username']:
        logger.error("setSecret: Attempting to modify user %s other than current user or clone %s" % (pending_dict['username'], current_dict['username']))
        raise ValueError(
            "Attempting to modify user %s other than current user or clone %s" % (pending_dict['username'], current_dict['username']))

    # Make sure the host from current and pending match
    if current_dict['host'] != pending_dict['host']:
        logger.error("setSecret: Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))
        raise ValueError("Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host']))

    # Before we do anything with the secret, make sure the AWSCURRENT secret is valid by logging in to the db
    # This ensures that the credential we are rotating is valid to protect against a confused deputy attack
    conn = get_connection(current_dict)
    if not conn:
        logger.error("setSecret: Unable to log into database using current credentials for secret %s" % arn)
        raise ValueError("Unable to log into database using current credentials for secret %s" % arn)
    conn.close()

    # Now get the master arn from the current secret
    master_arn = current_dict['masterarn']
    master_dict = get_secret_dict(service_client, master_arn, "AWSCURRENT")
    if current_dict['host'] != master_dict['host']:
        logger.error("setSecret: Current database host %s is not the same host as master %s" % (current_dict['host'], master_dict['host']))
        raise ValueError("Current database host %s is not the same host as master %s" % (current_dict['host'], master_dict['host']))

    # Now log into the database with the master credentials
    conn = get_connection(master_dict)
    if not conn:
        logger.error("setSecret: Unable to log into database using credentials in master secret %s" % master_arn)
        raise ValueError("Unable to log into database using credentials in master secret %s" % master_arn)

    # Now set the password to the pending password
    try:
        with conn.cursor() as cur:
            # Get escaped usernames via quote_ident
            cur.execute("SELECT quote_ident(%s)", (pending_dict['username'],))
            pending_username = cur.fetchone()[0]

            # Check if the user exists, if not create it and grant it all permissions from the current role
            # If the user exists, just update the password
            cur.execute("SELECT usename FROM pg_user where usename = %s", (pending_dict['username'],))
            if len(cur.fetchall()) == 0:
                create_role = "CREATE USER %s" % pending_username
                cur.execute(create_role + " WITH PASSWORD %s", (pending_dict['password'],))

                # Grant the database permissions
                db_perm_types = ['CREATE', 'TEMPORARY', 'TEMP']
                for perm in db_perm_types:
                    cur.execute("SELECT QUOTE_IDENT(dat.datname) as datname FROM pg_database dat WHERE HAS_DATABASE_PRIVILEGE(%s, dat.datname , %s)",
                                (current_dict['username'], perm))
                    databases = [row.datname for row in cur.fetchall()]
                    if databases:
                        cur.execute("GRANT %s ON DATABASE %s TO %s" % (perm, ','.join(databases), pending_username))

                # Grant table permissions
                table_perm_types = ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'REFERENCES']
                for perm in table_perm_types:
                    cur.execute("SELECT QUOTE_IDENT(tab.schemaname) as schemaname, QUOTE_IDENT(tab.tablename) as tablename FROM pg_tables tab WHERE "
                                "HAS_TABLE_PRIVILEGE(%s, QUOTE_IDENT(tab.schemaname) + '.' + QUOTE_IDENT(tab.tablename) , %s) AND tab.schemaname NOT IN ('pg_internal')",
                                (current_dict['username'], perm))
                    tables = [row.schemaname + '.' + row.tablename for row in cur.fetchall()]
                    if tables:
                        cur.execute("GRANT %s ON TABLE %s TO %s" % (perm, ','.join(tables), pending_username))

                # Grant schema permissions
                table_perm_types = ['CREATE', 'USAGE']
                for perm in table_perm_types:
                    cur.execute(
                        "SELECT QUOTE_IDENT(schemaname) as schemaname FROM (SELECT DISTINCT schemaname FROM pg_tables) WHERE HAS_SCHEMA_PRIVILEGE(%s, schemaname, %s)",
                        (current_dict['username'], perm))
                    schemas = [row.schemaname for row in cur.fetchall()]
                    if schemas:
                        cur.execute("GRANT %s ON SCHEMA %s TO %s" % (perm, ','.join(schemas), pending_username))
            else:
                alter_role = "ALTER USER %s" % pending_username
                cur.execute(alter_role + " WITH PASSWORD %s", (pending_dict['password'],))

            conn.commit()
            logger.info("setSecret: Successfully set password for %s in Redshift DB for secret arn %s." % (pending_dict['username'], arn))
    finally:
        conn.close()